Hi Sumeet, 1) Regarding to the above exception, it’s a known issue and has been fixed in FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> [1]. It will be available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 and build from source following the instruction described in [2] if needed.
2) Regarding to your requirements, could you describe what you want to do with group window or over window? For group window(e.g. tumble window, hop window, session window, etc), it will output one row for multiple inputs belonging to the same window. You could not just passing through it from input to sink as it is non-determinitic which row to use as there are multiple input rows. That’s the reason why you have to declare a field in the group by clause if you want to access it directly in the select clause. For over window, it will output one row for each input and so you could pass through it directly. [1] https://issues.apache.org/jira/browse/FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922>. [2] https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink <https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink> > 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道: > > Thanks Guowei. I'm trying out Over Windows, as follows: > > input \ > .over_window( > Over.partition_by(col(input.a)) \ > .order_by(input.Timestamp) \ > .preceding(lit(10).seconds) \ > .alias('w')) \ > .select( > input.b, > input.c.avg.over(col('w'))) \ > .execute_insert('MySink') \ > .wait() > > But running into following exception: > > py4j.protocol.Py4JError: An error occurred while calling > z:org.apache.flink.table.api.Over.partitionBy. Trace: > org.apache.flink.api.python.shaded.py4j.Py4JException: Method > partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist > > Is there any extra Jar that needs to be included for Over Windows. From the > code it doesn't appear so. > > Thanks, > Sumeet > > > On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com > <mailto:guowei....@gmail.com>> wrote: > Hi, Sumeet > > Maybe you could try the Over Windows[1], which could keep the "non-group-key" > column. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows> > > Best, > Guowei > > > On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <sumeet.malho...@gmail.com > <mailto:sumeet.malho...@gmail.com>> wrote: > Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any > issues. It's only when I want to use "input.b". > > My use case is to basically emit "input.b" in the final sink as is, and not > really perform any aggregation on that column - more like pass through from > input to sink. What's the best way to achieve this? I was thinking that > making it part of the select() clause would do it, but as you said there > needs to be some aggregation performed on it. > > Thanks, > Sumeet > > > On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com > <mailto:guowei....@gmail.com>> wrote: > Hi, Sumeet > For "input.b" I think you should aggregate the non-group-key column[1]. > But I am not sure why the "input.c.avg.alias('avg_value')" has resolved > errors. Would you mind giving more detailed error information? > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows> > > Best, > Guowei > > > On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <sumeet.malho...@gmail.com > <mailto:sumeet.malho...@gmail.com>> wrote: > Hi, > > I have a use case where I'm creating a Tumbling window as follows: > > "input" table has columns [Timestamp, a, b, c] > > input \ > .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \ > .group_by(col('w'), input.a) \ > .select( > col('w').start.alias('window_start'), > col('w').end.alias('window_end'), > input.b, > input.c.avg.alias('avg_value')) \ > .execute_insert('MySink') \ > .wait() > > This throws an exception that it cannot resolve the fields "b" and "c" inside > the select statement. If I mention these column names inside the group_by() > statement as follows: > > .group_by(col('w'), input.a, input.b, input.c) > > then the column names in the subsequent select statement can be resolved. > > Basically, unless the column name is explicitly made part of the group_by() > clause, the subsequent select() clause doesn't resolve it. This is very > similar to the example from Flink's documentation here [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples > > <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples>, > where a similar procedure works. > > Any idea how I can access columns from the input stream, without having to > mention them in the group_by() clause? I really don't want to group the > results by those fields, but they need to be written to the sink eventually. > > Thanks, > Sumeet