Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1467143192 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml: ## @@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu closed pull request #24162: [FLINK-34100][table] Support session window table function without pulling up with window agg URL: https://github.com/apache/flink/pull/24162 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1466526704 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml: ## @@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1466235244 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/WindowAggOperatorBuilder.java: ## @@ -170,4 +202,17 @@ public SlicingWindowAggOperatorBuilder countStarIndex(int indexOfCountStart) { } return new SlicingWindowOperator<>(windowProcessor); } + +@SuppressWarnings("unchecked") +private WindowOperatorBase buildUnslicingWindowOperator() { +final UnsliceWindowAggProcessor windowProcessor = +new UnsliceWindowAggProcessor( +(GeneratedNamespaceAggsHandleFunction) +generatedAggregateFunction, +(UnsliceAssigner) assigner, +accSerializer, +indexOfCountStart, +shiftTimeZone); +return new UnslicingWindowOperator<>(windowProcessor); Review Comment: I think it's unnecessary to bind the UnslicingWindowOperator and session window too strongly here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1466230991 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java: ## @@ -103,4 +111,67 @@ protected Transformation translateToPlanInternal( inputTransform.getParallelism(), false); } + +private Transformation translateWithUnalignedWindow( +PlannerBase planner, +ExecNodeConfig config, +RowType inputRowType, +Transformation inputTransform) { +final WindowTableFunctionOperatorBase windowTableFunctionOperator = +createUnalignedWindowTableFunctionOperator(config, inputRowType); +final OneInputTransformation transform = +ExecNodeUtil.createOneInputTransformation( +inputTransform, +createTransformationMeta(WINDOW_TRANSFORMATION, config), +windowTableFunctionOperator, +InternalTypeInfo.of(getOutputType()), +inputTransform.getParallelism(), +false); + +final int[] partitionKeys = extractPartitionKeys(windowingStrategy.getWindow()); Review Comment: If user doesn't specify the partition keys, a EmptyRowDataKeySelector will be returned. I'll split this function into StreamXXX and BatchXXX. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1466223510 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/WindowTableFunctionOperatorBase.java: ## @@ -91,28 +83,24 @@ public void open() throws Exception { } @Override -public void processElement(StreamRecord element) throws Exception { -RowData inputRow = element.getValue(); -long timestamp; -if (windowAssigner.isEventTime()) { -if (inputRow.isNullAt(rowtimeIndex)) { -// null timestamp would be dropped -return; -} -timestamp = inputRow.getTimestamp(rowtimeIndex, 3).getMillisecond(); -} else { -timestamp = getProcessingTimeService().getCurrentProcessingTime(); +public void close() throws Exception { +super.close(); +if (collector != null) { +collector.close(); } -timestamp = toUtcTimestampMills(timestamp, shiftTimeZone); -Collection elementWindows = windowAssigner.assignWindows(inputRow, timestamp); -for (TimeWindow window : elementWindows) { +} + +protected void collect(RowData inputRow, Collection allWindows) { +for (TimeWindow window : allWindows) { windowProperties.setField(0, TimestampData.fromEpochMillis(window.getStart())); windowProperties.setField(1, TimestampData.fromEpochMillis(window.getEnd())); windowProperties.setField( 2, TimestampData.fromEpochMillis( toEpochMills(window.maxTimestamp(), shiftTimeZone))); -collector.collect(outRow.replace(inputRow, windowProperties)); +outRow.replace(inputRow, windowProperties); +outRow.setRowKind(inputRow.getRowKind()); Review Comment: Because if not, the RowKind of the output data is always INSERT. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1465992611 ## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecWindowTableFunction.java: ## @@ -103,4 +111,67 @@ protected Transformation translateToPlanInternal( inputTransform.getParallelism(), false); } + +private Transformation translateWithUnalignedWindow( +PlannerBase planner, +ExecNodeConfig config, +RowType inputRowType, +Transformation inputTransform) { +final WindowTableFunctionOperatorBase windowTableFunctionOperator = +createUnalignedWindowTableFunctionOperator(config, inputRowType); +final OneInputTransformation transform = +ExecNodeUtil.createOneInputTransformation( +inputTransform, +createTransformationMeta(WINDOW_TRANSFORMATION, config), +windowTableFunctionOperator, +InternalTypeInfo.of(getOutputType()), +inputTransform.getParallelism(), +false); + +final int[] partitionKeys = extractPartitionKeys(windowingStrategy.getWindow()); Review Comment: If user doesn't specify the partition key in session tvf, do we need to set the state KeySelector? The session window tvf operator is stateful, I want to say is that if user doesn't specify the key, the session operator is singleton, so what is the state key? Moreover, batch mode doesn't need to set KeySelector. ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java: ## @@ -0,0 +1,617 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.MergingState; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.runtime.state.internal.InternalMergingState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.InternalWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464907031 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala: ## @@ -1327,4 +1328,146 @@ class WindowTableFunctionITCase extends BatchTestBase { ) ) } + + @Test + def testSessionTVF(): Unit = { Review Comment: I found that currently, in batch session window is not supported. So I think It's better not to support it in window tvf. I'll update this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464873767 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java: ## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.time.ZoneId; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator for unaligned window table function. + * + * See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row + * semantics (e.g TUMBLE/HOP/CUMULATE). + * + * This operator emits result at the end of window instead of per record. + * + * This operator will not compact changelog records. + * + * This operator will keep the original order of input records when outputting. + */ +public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase +implements Triggerable { + +private static final long serialVersionUID = 1L; + +private final Trigger trigger; + +private final LogicalType[] inputFieldTypes; + +private final TypeSerializer windowSerializer; + +private transient InternalTimerService internalTimerService; + +// a counter to tag the order of all input streams when entering the operator +private transient ValueState counterState; + +private transient InternalMapState windowState; + +private
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464873767 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java: ## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.time.ZoneId; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator for unaligned window table function. + * + * See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row + * semantics (e.g TUMBLE/HOP/CUMULATE). + * + * This operator emits result at the end of window instead of per record. + * + * This operator will not compact changelog records. + * + * This operator will keep the original order of input records when outputting. + */ +public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase +implements Triggerable { + +private static final long serialVersionUID = 1L; + +private final Trigger trigger; + +private final LogicalType[] inputFieldTypes; + +private final TypeSerializer windowSerializer; + +private transient InternalTimerService internalTimerService; + +// a counter to tag the order of all input streams when entering the operator +private transient ValueState counterState; + +private transient InternalMapState windowState; + +private
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464871853 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala: ## @@ -1327,4 +1328,146 @@ class WindowTableFunctionITCase extends BatchTestBase { ) ) } + + @Test + def testSessionTVF(): Unit = { Review Comment: I would like to emphasize session window agg, not window tvf, does it work in batch mode? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464774651 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml: ## @@ -1186,6 +1186,196 @@ Calc(select=[window_time, start_time, end_time]) +- Calc(select=[CAST(rowtime AS TIMESTAMP(3)) AS rowtime, rowtime AS rowtime_0]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime]) +]]> + + + Review Comment: https://github.com/apache/flink/pull/23505/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464774215 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml: ## @@ -1186,6 +1186,196 @@ Calc(select=[window_time, start_time, end_time]) +- Calc(select=[CAST(rowtime AS TIMESTAMP(3)) AS rowtime, rowtime AS rowtime_0]) +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)]) +- TableSourceScan(table=[[default_catalog, default_database, MyTable, project=[rowtime], metadata=[]]], fields=[rowtime]) +]]> + + + Review Comment: Good catch! Actually, I have add one test in the pr that this pr is based on about planner. ![image](https://github.com/apache/flink/assets/25195874/af99e4f7-1970-46bc-b290-11b6fb8b3208) ![image](https://github.com/apache/flink/assets/25195874/3c2c9fec-fcef-4a22-b774-529a1ba9d5a8) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464769669 ## flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.xml: ## @@ -2170,6 +2360,386 @@ Sink(table=[default_catalog.default_database.sink], fields=[ws, we, b, c]) +- Exchange(distribution=[hash[b]]) +- Calc(select=[b, CASE((a = 11), 1, null:INTEGER) AS $f3, PROCTIME() AS proctime], where=[SEARCH(a, Sarg[1, 5, 7, 9, 11])]) +- TableSourceScan(table=[[default_catalog, default_database, source, project=[a, b], metadata=[]]], fields=[a, b]) +]]> + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464766029 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/WindowTableFunctionITCase.scala: ## @@ -1327,4 +1328,146 @@ class WindowTableFunctionITCase extends BatchTestBase { ) ) } + + @Test + def testSessionTVF(): Unit = { Review Comment: I think it should be the same as other window tvf. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464763918 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java: ## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.time.ZoneId; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator for unaligned window table function. + * + * See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row + * semantics (e.g TUMBLE/HOP/CUMULATE). + * + * This operator emits result at the end of window instead of per record. + * + * This operator will not compact changelog records. + * + * This operator will keep the original order of input records when outputting. + */ +public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase +implements Triggerable { + +private static final long serialVersionUID = 1L; + +private final Trigger trigger; + +private final LogicalType[] inputFieldTypes; + +private final TypeSerializer windowSerializer; + +private transient InternalTimerService internalTimerService; + +// a counter to tag the order of all input streams when entering the operator +private transient ValueState counterState; + +private transient InternalMapState windowState; + +
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1464755151 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java: ## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.time.ZoneId; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator for unaligned window table function. + * + * See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row + * semantics (e.g TUMBLE/HOP/CUMULATE). + * + * This operator emits result at the end of window instead of per record. + * + * This operator will not compact changelog records. + * + * This operator will keep the original order of input records when outputting. + */ +public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase +implements Triggerable { + +private static final long serialVersionUID = 1L; + +private final Trigger trigger; + +private final LogicalType[] inputFieldTypes; + +private final TypeSerializer windowSerializer; + +private transient InternalTimerService internalTimerService; + +// a counter to tag the order of all input streams when entering the operator +private transient ValueState counterState; + +private transient InternalMapState windowState; + +
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
lsyldliu commented on code in PR #24162: URL: https://github.com/apache/flink/pull/24162#discussion_r1462995182 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/tvf/operator/UnalignedWindowTableFunctionOperator.java: ## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.window.tvf.operator; + +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.state.internal.InternalMapState; +import org.apache.flink.streaming.api.operators.InternalTimer; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.Triggerable; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.dataview.PerWindowStateDataViewStore; +import org.apache.flink.table.runtime.dataview.StateDataViewStore; +import org.apache.flink.table.runtime.generated.NamespaceAggsHandleFunctionBase; +import org.apache.flink.table.runtime.operators.window.TimeWindow; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.GroupWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.assigners.MergingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.groupwindow.internal.MergingWindowProcessFunction; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.groupwindow.triggers.Trigger; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingTriggerContextBase; +import org.apache.flink.table.runtime.operators.window.tvf.unslicing.UnslicingWindowContextBase; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.util.function.BiConsumerWithException; + +import java.time.ZoneId; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.TreeMap; + +import static java.util.Objects.requireNonNull; +import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The operator for unaligned window table function. + * + * See more details about aligned window and unaligned window in {@link + * org.apache.flink.table.runtime.operators.window.tvf.common.WindowOperatorBase}. + * + * Note: The operator only applies for Window TVF with set semantics (e.g SESSION) instead of row + * semantics (e.g TUMBLE/HOP/CUMULATE). + * + * This operator emits result at the end of window instead of per record. + * + * This operator will not compact changelog records. + * + * This operator will keep the original order of input records when outputting. + */ +public class UnalignedWindowTableFunctionOperator extends WindowTableFunctionOperatorBase +implements Triggerable { + +private static final long serialVersionUID = 1L; + +private final Trigger trigger; + +private final LogicalType[] inputFieldTypes; + +private final TypeSerializer windowSerializer; + +private transient InternalTimerService internalTimerService; + +// a counter to tag the order of all input streams when entering the operator +private transient ValueState counterState; + +private transient InternalMapState windowState; + +private
Re: [PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
flinkbot commented on PR #24162: URL: https://github.com/apache/flink/pull/24162#issuecomment-1903131997 ## CI report: * 5b90be3d7e080cfaea62a1711406a21319b415a2 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-34100][table] Support session window table function without pulling up with window agg [flink]
xuyangzhong opened a new pull request, #24162: URL: https://github.com/apache/flink/pull/24162 ## What is the purpose of the change Introduce windowed unslice window assigner. And support session window table function without pulling up with window agg. Note: this is a draft pr, and only the commits which messages start with '[FLINK-34100]' is new commits related to this pr. ## Brief change log - *add function getDescription for internal interface WindowAssigner* - *extract a WindowTableFunctionOperatorBase from WindowTableFunctionOperator to prepare for introducing unaligned window table function* - *introduce UnalignedWindowTableFunctionOperator for unaligned window* - *fix missing exchange before window table function node* - *support session window table function without pulling up with window agg* ## Verifying this change Some harness tests and ITCases are added. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? A later single pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org