Hi Sumeet,

1) Regarding to the above exception, it’s a known issue and has been fixed in 
FLINK-21922 <https://issues.apache.org/jira/browse/FLINK-21922> [1]. It will be 
available in the coming 1.12.3. You could also cherry-pick that fix to 1.12.2 
and build from source following the instruction described in [2] if needed.

2) Regarding to your requirements, could you describe what you want to do with 
group window or over window? 
For group window(e.g. tumble window, hop window, session window, etc), it will 
output one row for multiple inputs belonging to the same window. You could not 
just passing through it from input to sink as it is non-determinitic which row 
to use as there are multiple input rows. That’s the reason why you have to 
declare a field in the group by clause if you want to access it directly in the 
select clause. For over window, it will output one row for each input and so 
you could pass through it directly.

[1] https://issues.apache.org/jira/browse/FLINK-21922 
<https://issues.apache.org/jira/browse/FLINK-21922>.
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.12/flinkDev/building.html#build-pyflink>


> 2021年4月19日 下午5:16,Sumeet Malhotra <sumeet.malho...@gmail.com> 写道:
> 
> Thanks Guowei. I'm trying out Over Windows, as follows:
> 
> input \
>     .over_window(
>         Over.partition_by(col(input.a)) \
>         .order_by(input.Timestamp) \
>         .preceding(lit(10).seconds) \
>         .alias('w')) \
>     .select(
>         input.b,
>         input.c.avg.over(col('w'))) \
>     .execute_insert('MySink') \
>     .wait()
> 
> But running into following exception:
> 
> py4j.protocol.Py4JError: An error occurred while calling 
> z:org.apache.flink.table.api.Over.partitionBy. Trace:
> org.apache.flink.api.python.shaded.py4j.Py4JException: Method 
> partitionBy([class org.apache.flink.table.api.ApiExpression]) does not exist
> 
> Is there any extra Jar that needs to be included for Over Windows. From the 
> code it doesn't appear so.
> 
> Thanks,
> Sumeet
> 
> 
> On Mon, Apr 19, 2021 at 1:10 PM Guowei Ma <guowei....@gmail.com 
> <mailto:guowei....@gmail.com>> wrote:
> Hi, Sumeet
> 
> Maybe you could try the Over Windows[1], which could keep the "non-group-key" 
> column.
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#over-windows>
> 
> Best,
> Guowei
> 
> 
> On Mon, Apr 19, 2021 at 3:25 PM Sumeet Malhotra <sumeet.malho...@gmail.com 
> <mailto:sumeet.malho...@gmail.com>> wrote:
> Thanks Guowei! Regarding "input.c.avg" you're right, that doesn't cause any 
> issues. It's only when I want to use "input.b".
> 
> My use case is to basically emit "input.b" in the final sink as is, and not 
> really perform any aggregation on that column - more like pass through from 
> input to sink. What's the best way to achieve this? I was thinking that 
> making it part of the select() clause would do it, but as you said there 
> needs to be some aggregation performed on it.
> 
> Thanks,
> Sumeet
> 
> 
> On Mon, Apr 19, 2021 at 12:26 PM Guowei Ma <guowei....@gmail.com 
> <mailto:guowei....@gmail.com>> wrote:
> Hi, Sumeet
>       For "input.b" I think you should aggregate the non-group-key column[1]. 
> But I am not sure why the "input.c.avg.alias('avg_value')"  has resolved 
> errors. Would you mind giving more detailed error information?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#group-windows>
> 
> Best,
> Guowei
> 
> 
> On Mon, Apr 19, 2021 at 2:24 PM Sumeet Malhotra <sumeet.malho...@gmail.com 
> <mailto:sumeet.malho...@gmail.com>> wrote:
> Hi,
> 
> I have a use case where I'm creating a Tumbling window as follows:
> 
> "input" table has columns [Timestamp, a, b, c]
> 
> input \
>     .window(Tumble.over(lit(10).seconds).on(input.Timestamp).alias("w")) \
>     .group_by(col('w'), input.a) \
>     .select( 
>         col('w').start.alias('window_start'),
>         col('w').end.alias('window_end'),
>         input.b,
>         input.c.avg.alias('avg_value')) \
>     .execute_insert('MySink') \
>     .wait()
> 
> This throws an exception that it cannot resolve the fields "b" and "c" inside 
> the select statement. If I mention these column names inside the group_by() 
> statement as follows:
> 
> .group_by(col('w'), input.a, input.b, input.c)
> 
> then the column names in the subsequent select statement can be resolved.
> 
> Basically, unless the column name is explicitly made part of the group_by() 
> clause, the subsequent select() clause doesn't resolve it. This is very 
> similar to the example from Flink's documentation here [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html#overview--examples>,
>  where a similar procedure works.
> 
> Any idea how I can access columns from the input stream, without having to 
> mention them in the group_by() clause? I really don't want to group the 
> results by those fields, but they need to be written to the sink eventually.
> 
> Thanks,
> Sumeet

Reply via email to