@Roman - yes, I have the error if I do that. @Xingbo Huang <hxbks...@gmail.com> - okay, I didn't know DDL was the more recommended way. Please let me know if you confirm that this is a bug. Thanks!
On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <hxbks...@gmail.com> wrote: > Hi Manas, > Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which > is the more recommended way > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/ > > Best, > Xingbo > > Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月13日周一 下午7:23写道: > >> Hi Manas, >> >> Do you have the same error if you replace >> >> .group_by("five_sec_window, monitorId") \ >> >> with >> >> .group_by("five_sec_window") \ >> >> ? >> >> Regards, >> Roman >> >> >> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com> >> wrote: >> >>> Hi, >>> I have the following piece of code (for pyFlink v1.11) : >>> >>> t_env.from_path(INPUT_TABLE) \ >>> .select("monitorId, data, rowtime") \ >>> >>> .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \ >>> .group_by("five_sec_window, monitorId") \ >>> .select("monitorId, data.avg, data.min, data.max, >>> five_sec_window.rowtime") \ >>> .execute_insert(OUTPUT_TABLE) >>> >>> Which is generating the exception : >>> >>> Traceback (most recent call last): >>> >>> >>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, >>> in <module> .select("monitorId, data.avg, data.min, data.max, >>> five_sec_window.rowtime") \* File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", >>> line 907, in select >>> return Table(self._j_table.select(fields), self._t_env) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", >>> line 1286, in __call__ >>> answer, self.gateway_client, self.target_id, self.name) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", >>> line 147, in deco >>> return f(*a, **kw) >>> File >>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", >>> line 328, in get_return_value >>> format(target_id, ".", name), value) >>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select. >>> >>> *: org.apache.flink.table.api.ValidationException: A group window >>> expects a time attribute for grouping in a stream environment.* >>> >>> The "rowtime" attribute in INPUT_TABLE is created as : >>> >>> exec_env = StreamExecutionEnvironment.get_execution_environment() >>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) >>> t_env = StreamTableEnvironment.create(exec_env, >>> >>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() >>> ) >>> >>> ... >>> >>> .field("rowtime", DataTypes.TIMESTAMP(3)) >>> .rowtime( >>> Rowtime() >>> .timestamps_from_field("time_st") >>> .watermarks_periodic_ascending()) >>> >>> ).create_temporary_table(INPUT_TABLE) >>> >>> >>> What is wrong with the code? I believe that I have already indicated >>> which attribute has to be treated as the time attribute. >>> >>> Thank you, >>> Manas >>> >>