Hi Manas,
Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which is
the more recommended way

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/

Best,
Xingbo

Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月13日周一 下午7:23写道:

> Hi Manas,
>
> Do you have the same error if you replace
>
>     .group_by("five_sec_window, monitorId") \
>
> with
>
>     .group_by("five_sec_window") \
>
> ?
>
> Regards,
> Roman
>
>
> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com> wrote:
>
>> Hi,
>> I have the following piece of code (for pyFlink v1.11) :
>>
>> t_env.from_path(INPUT_TABLE) \
>>     .select("monitorId, data, rowtime") \
>>     .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) 
>> \
>>     .group_by("five_sec_window, monitorId") \
>>     .select("monitorId, data.avg, data.min, data.max, 
>> five_sec_window.rowtime") \
>>     .execute_insert(OUTPUT_TABLE)
>>
>> Which is generating the exception :
>>
>> Traceback (most recent call last):
>>
>>
>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
>> <module>    .select("monitorId, data.avg, data.min, data.max,
>> five_sec_window.rowtime") \*  File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
>> line 907, in select
>>     return Table(self._j_table.select(fields), self._t_env)
>>   File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>>     answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>> line 147, in deco
>>     return f(*a, **kw)
>>   File
>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>> line 328, in get_return_value
>>     format(target_id, ".", name), value)
>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>>
>> *: org.apache.flink.table.api.ValidationException: A group window expects
>> a time attribute for grouping in a stream environment.*
>>
>> The "rowtime" attribute in INPUT_TABLE is created as :
>>
>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>> t_env = StreamTableEnvironment.create(exec_env,
>>                                       
>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>                                       )
>>
>> ...
>>
>>      .field("rowtime", DataTypes.TIMESTAMP(3))
>>         .rowtime(
>>             Rowtime()
>>             .timestamps_from_field("time_st")
>>             .watermarks_periodic_ascending())
>>
>> ).create_temporary_table(INPUT_TABLE)
>>
>>
>> What is wrong with the code? I believe that I have already indicated
>> which attribute has to be treated as the time attribute.
>>
>> Thank you,
>> Manas
>>
>

Reply via email to