Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1467143192


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##
@@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[ws, we, b, c])
   +- Exchange(distribution=[hash[b]])
  +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() 
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
 +- TableSourceScan(table=[[default_catalog, default_database, 
source, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


lsyldliu closed pull request #24162: [FLINK-34100][table] Support session 
window table function without pulling up with window agg
URL: https://github.com/apache/flink/pull/24162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1466526704


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##
@@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[ws, we, b, c])
   +- Exchange(distribution=[hash[b]])
  +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() 
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
 +- TableSourceScan(table=[[default_catalog, default_database, 
source, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1466235244


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java:
##
@@ -170,4 +202,17 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int 
indexOfCountStart) {
 }
 return new SlicingWindowOperator<>(windowProcessor);
 }
+
+@SuppressWarnings("unchecked")
+private WindowOperatorBase buildUnslicingWindowOperator() {
+final UnsliceWindowAggProcessor windowProcessor =
+new UnsliceWindowAggProcessor(
+(GeneratedNamespaceAggsHandleFunction)
+generatedAggregateFunction,
+(UnsliceAssigner) assigner,
+accSerializer,
+indexOfCountStart,
+shiftTimeZone);
+return new UnslicingWindowOperator<>(windowProcessor);

Review Comment:
   I think it's unnecessary to bind the UnslicingWindowOperator and session 
window too strongly here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1466230991


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java:
##
@@ -103,4 +111,67 @@ protected Transformation translateToPlanInternal(
 inputTransform.getParallelism(),
 false);
 }
+
+private Transformation translateWithUnalignedWindow(
+PlannerBase planner,
+ExecNodeConfig config,
+RowType inputRowType,
+Transformation inputTransform) {
+final WindowTableFunctionOperatorBase windowTableFunctionOperator =
+createUnalignedWindowTableFunctionOperator(config, 
inputRowType);
+final OneInputTransformation transform =
+ExecNodeUtil.createOneInputTransformation(
+inputTransform,
+createTransformationMeta(WINDOW_TRANSFORMATION, 
config),
+windowTableFunctionOperator,
+InternalTypeInfo.of(getOutputType()),
+inputTransform.getParallelism(),
+false);
+
+final int[] partitionKeys = 
extractPartitionKeys(windowingStrategy.getWindow());

Review Comment:
   If user doesn't specify the partition keys, a EmptyRowDataKeySelector will 
be returned.
   I'll split this function into StreamXXX and BatchXXX.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1466223510


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java:
##
@@ -91,28 +83,24 @@ public void open() throws Exception {
 }
 
 @Override
-public void processElement(StreamRecord element) throws Exception 
{
-RowData inputRow = element.getValue();
-long timestamp;
-if (windowAssigner.isEventTime()) {
-if (inputRow.isNullAt(rowtimeIndex)) {
-// null timestamp would be dropped
-return;
-}
-timestamp = inputRow.getTimestamp(rowtimeIndex, 
3).getMillisecond();
-} else {
-timestamp = getProcessingTimeService().getCurrentProcessingTime();
+public void close() throws Exception {
+super.close();
+if (collector != null) {
+collector.close();
 }
-timestamp = toUtcTimestampMills(timestamp, shiftTimeZone);
-Collection elementWindows = 
windowAssigner.assignWindows(inputRow, timestamp);
-for (TimeWindow window : elementWindows) {
+}
+
+protected void collect(RowData inputRow, Collection 
allWindows) {
+for (TimeWindow window : allWindows) {
 windowProperties.setField(0, 
TimestampData.fromEpochMillis(window.getStart()));
 windowProperties.setField(1, 
TimestampData.fromEpochMillis(window.getEnd()));
 windowProperties.setField(
 2,
 TimestampData.fromEpochMillis(
 toEpochMills(window.maxTimestamp(), 
shiftTimeZone)));
-collector.collect(outRow.replace(inputRow, windowProperties));
+outRow.replace(inputRow, windowProperties);
+outRow.setRowKind(inputRow.getRowKind());

Review Comment:
   Because if not, the RowKind of the output data is always INSERT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-25 Thread via GitHub


lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1465992611


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java:
##
@@ -103,4 +111,67 @@ protected Transformation translateToPlanInternal(
 inputTransform.getParallelism(),
 false);
 }
+
+private Transformation translateWithUnalignedWindow(
+PlannerBase planner,
+ExecNodeConfig config,
+RowType inputRowType,
+Transformation inputTransform) {
+final WindowTableFunctionOperatorBase windowTableFunctionOperator =
+createUnalignedWindowTableFunctionOperator(config, 
inputRowType);
+final OneInputTransformation transform =
+ExecNodeUtil.createOneInputTransformation(
+inputTransform,
+createTransformationMeta(WINDOW_TRANSFORMATION, 
config),
+windowTableFunctionOperator,
+InternalTypeInfo.of(getOutputType()),
+inputTransform.getParallelism(),
+false);
+
+final int[] partitionKeys = 
extractPartitionKeys(windowingStrategy.getWindow());

Review Comment:
   If user doesn't specify the partition key in session tvf, do we need to set 
the state KeySelector? The session window tvf operator is stateful,  I want to 
say is that if user doesn't specify the key, the session operator is singleton, 
so what is the state key? Moreover, batch mode doesn't need to set KeySelector.



##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464907031


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala:
##
@@ -1327,4 +1328,146 @@ class WindowTableFunctionITCase extends BatchTestBase {
   )
 )
   }
+
+  @Test
+  def testSessionTVF(): Unit = {

Review Comment:
   I found that currently, in batch session window is not supported. So I think 
It's better not to support it in window tvf. I'll update this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464873767


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * Note: The operator only applies for Window TVF with set semantics (e.g 
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * This operator emits result at the end of window instead of per record.
+ *
+ * This operator will not compact changelog records.
+ *
+ * This operator will keep the original order of input records when 
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase
+implements Triggerable {
+
+private static final long serialVersionUID = 1L;
+
+private final Trigger trigger;
+
+private final LogicalType[] inputFieldTypes;
+
+private final TypeSerializer windowSerializer;
+
+private transient InternalTimerService internalTimerService;
+
+// a counter to tag the order of all input streams when entering the 
operator
+private transient ValueState counterState;
+
+private transient InternalMapState 
windowState;
+
+private 

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464873767


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * Note: The operator only applies for Window TVF with set semantics (e.g 
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * This operator emits result at the end of window instead of per record.
+ *
+ * This operator will not compact changelog records.
+ *
+ * This operator will keep the original order of input records when 
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase
+implements Triggerable {
+
+private static final long serialVersionUID = 1L;
+
+private final Trigger trigger;
+
+private final LogicalType[] inputFieldTypes;
+
+private final TypeSerializer windowSerializer;
+
+private transient InternalTimerService internalTimerService;
+
+// a counter to tag the order of all input streams when entering the 
operator
+private transient ValueState counterState;
+
+private transient InternalMapState 
windowState;
+
+private 

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464871853


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala:
##
@@ -1327,4 +1328,146 @@ class WindowTableFunctionITCase extends BatchTestBase {
   )
 )
   }
+
+  @Test
+  def testSessionTVF(): Unit = {

Review Comment:
   I would like to emphasize session window agg, not window tvf, does it work 
in batch mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464774651


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##
@@ -1186,6 +1186,196 @@ Calc(select=[window_time, start_time, end_time])
  +- Calc(select=[CAST(rowtime AS TIMESTAMP(3)) AS rowtime, rowtime AS 
rowtime_0])
 +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+
+  
+  

Review Comment:
   https://github.com/apache/flink/pull/23505/files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464774215


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##
@@ -1186,6 +1186,196 @@ Calc(select=[window_time, start_time, end_time])
  +- Calc(select=[CAST(rowtime AS TIMESTAMP(3)) AS rowtime, rowtime AS 
rowtime_0])
 +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 
1000:INTERVAL SECOND)])
+- TableSourceScan(table=[[default_catalog, default_database, 
MyTable, project=[rowtime], metadata=[]]], fields=[rowtime])
+]]>
+
+  
+  

Review Comment:
   Good catch! Actually, I have add one test in the pr that this pr is based on 
about planner.
   
![image](https://github.com/apache/flink/assets/25195874/af99e4f7-1970-46bc-b290-11b6fb8b3208)
   
![image](https://github.com/apache/flink/assets/25195874/3c2c9fec-fcef-4a22-b774-529a1ba9d5a8)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464769669


##
flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml:
##
@@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], 
fields=[ws, we, b, c])
   +- Exchange(distribution=[hash[b]])
  +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() 
AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])])
 +- TableSourceScan(table=[[default_catalog, default_database, 
source, project=[a, b], metadata=[]]], fields=[a, b])
+]]>
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  
+
+  
+  
+
+  
+
+
+  
+
+
+  

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464766029


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala:
##
@@ -1327,4 +1328,146 @@ class WindowTableFunctionITCase extends BatchTestBase {
   )
 )
   }
+
+  @Test
+  def testSessionTVF(): Unit = {

Review Comment:
   I think it should be the same as other window tvf.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464763918


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * Note: The operator only applies for Window TVF with set semantics (e.g 
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * This operator emits result at the end of window instead of per record.
+ *
+ * This operator will not compact changelog records.
+ *
+ * This operator will keep the original order of input records when 
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase
+implements Triggerable {
+
+private static final long serialVersionUID = 1L;
+
+private final Trigger trigger;
+
+private final LogicalType[] inputFieldTypes;
+
+private final TypeSerializer windowSerializer;
+
+private transient InternalTimerService internalTimerService;
+
+// a counter to tag the order of all input streams when entering the 
operator
+private transient ValueState counterState;
+
+private transient InternalMapState 
windowState;
+
+

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-24 Thread via GitHub


xuyangzhong commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1464755151


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * Note: The operator only applies for Window TVF with set semantics (e.g 
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * This operator emits result at the end of window instead of per record.
+ *
+ * This operator will not compact changelog records.
+ *
+ * This operator will keep the original order of input records when 
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase
+implements Triggerable {
+
+private static final long serialVersionUID = 1L;
+
+private final Trigger trigger;
+
+private final LogicalType[] inputFieldTypes;
+
+private final TypeSerializer windowSerializer;
+
+private transient InternalTimerService internalTimerService;
+
+// a counter to tag the order of all input streams when entering the 
operator
+private transient ValueState counterState;
+
+private transient InternalMapState 
windowState;
+
+

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-23 Thread via GitHub


lsyldliu commented on code in PR #24162:
URL: https://github.com/apache/flink/pull/24162#discussion_r1462995182


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java:
##
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.window.tvf.operator;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore;
+import org.apache.flink.table.runtime.dataview.StateDataViewStore;
+import 
org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase;
+import org.apache.flink.table.runtime.operators.window.TimeWindow;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers;
+import 
org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase;
+import 
org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The operator for unaligned window table function.
+ *
+ * See more details about aligned window and unaligned window in {@link
+ * 
org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}.
+ *
+ * Note: The operator only applies for Window TVF with set semantics (e.g 
SESSION) instead of row
+ * semantics (e.g TUMBLE/HOP/CUMULATE).
+ *
+ * This operator emits result at the end of window instead of per record.
+ *
+ * This operator will not compact changelog records.
+ *
+ * This operator will keep the original order of input records when 
outputting.
+ */
+public class UnalignedWindowTableFunctionOperator extends 
WindowTableFunctionOperatorBase
+implements Triggerable {
+
+private static final long serialVersionUID = 1L;
+
+private final Trigger trigger;
+
+private final LogicalType[] inputFieldTypes;
+
+private final TypeSerializer windowSerializer;
+
+private transient InternalTimerService internalTimerService;
+
+// a counter to tag the order of all input streams when entering the 
operator
+private transient ValueState counterState;
+
+private transient InternalMapState 
windowState;
+
+private 

Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-21 Thread via GitHub


flinkbot commented on PR #24162:
URL: https://github.com/apache/flink/pull/24162#issuecomment-1903131997

   
   ## CI report:
   
   * 5b90be3d7e080cfaea62a1711406a21319b415a2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]

2024-01-21 Thread via GitHub


xuyangzhong opened a new pull request, #24162:
URL: https://github.com/apache/flink/pull/24162

   ## What is the purpose of the change
   
   Introduce windowed unslice window assigner. And support session window table 
function without pulling up with window agg. 
   
   Note: this is a draft pr, and only the commits which messages start with 
'[FLINK-34100]' is new commits related to this pr.
   
   ## Brief change log
   
 - *add function getDescription for internal interface WindowAssigner*
 - *extract a WindowTableFunctionOperatorBase from 
WindowTableFunctionOperator to prepare for introducing unaligned window table 
function*
 - *introduce UnalignedWindowTableFunctionOperator for unaligned window*
 - *fix missing exchange before window table function node*
 - *support session window table function without pulling up with window 
agg*
   
   ## Verifying this change
   
   Some harness tests and ITCases are added.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? A later single pr.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org