Hi Manas, Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which is the more recommended way
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ Best, Xingbo Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月13日周一 下午7:23写道: > Hi Manas, > > Do you have the same error if you replace > > .group_by("five_sec_window, monitorId") \ > > with > > .group_by("five_sec_window") \ > > ? > > Regards, > Roman > > > On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com> wrote: > >> Hi, >> I have the following piece of code (for pyFlink v1.11) : >> >> t_env.from_path(INPUT_TABLE) \ >> .select("monitorId, data, rowtime") \ >> .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) >> \ >> .group_by("five_sec_window, monitorId") \ >> .select("monitorId, data.avg, data.min, data.max, >> five_sec_window.rowtime") \ >> .execute_insert(OUTPUT_TABLE) >> >> Which is generating the exception : >> >> Traceback (most recent call last): >> >> >> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in >> <module> .select("monitorId, data.avg, data.min, data.max, >> five_sec_window.rowtime") \* File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", >> line 907, in select >> return Table(self._j_table.select(fields), self._t_env) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", >> line 1286, in __call__ >> answer, self.gateway_client, self.target_id, self.name) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >> line 147, in deco >> return f(*a, **kw) >> File >> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", >> line 328, in get_return_value >> format(target_id, ".", name), value) >> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select. >> >> *: org.apache.flink.table.api.ValidationException: A group window expects >> a time attribute for grouping in a stream environment.* >> >> The "rowtime" attribute in INPUT_TABLE is created as : >> >> exec_env = StreamExecutionEnvironment.get_execution_environment() >> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >> t_env = StreamTableEnvironment.create(exec_env, >> >> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >> ) >> >> ... >> >> .field("rowtime", DataTypes.TIMESTAMP(3)) >> .rowtime( >> Rowtime() >> .timestamps_from_field("time_st") >> .watermarks_periodic_ascending()) >> >> ).create_temporary_table(INPUT_TABLE) >> >> >> What is wrong with the code? I believe that I have already indicated >> which attribute has to be treated as the time attribute. >> >> Thank you, >> Manas >> >