Hi Manas, Do you have the same error if you replace
.group_by("five_sec_window, monitorId") \ with .group_by("five_sec_window") \ ? Regards, Roman On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com> wrote: > Hi, > I have the following piece of code (for pyFlink v1.11) : > > t_env.from_path(INPUT_TABLE) \ > .select("monitorId, data, rowtime") \ > .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \ > .group_by("five_sec_window, monitorId") \ > .select("monitorId, data.avg, data.min, data.max, > five_sec_window.rowtime") \ > .execute_insert(OUTPUT_TABLE) > > Which is generating the exception : > > Traceback (most recent call last): > > > * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in > <module> .select("monitorId, data.avg, data.min, data.max, > five_sec_window.rowtime") \* File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py", > line 907, in select > return Table(self._j_table.select(fields), self._t_env) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py", > line 1286, in __call__ > answer, self.gateway_client, self.target_id, self.name) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py", > line 147, in deco > return f(*a, **kw) > File > "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py", > line 328, in get_return_value > format(target_id, ".", name), value) > py4j.protocol.Py4JJavaError: An error occurred while calling o87.select. > > *: org.apache.flink.table.api.ValidationException: A group window expects > a time attribute for grouping in a stream environment.* > > The "rowtime" attribute in INPUT_TABLE is created as : > > exec_env = StreamExecutionEnvironment.get_execution_environment() > exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime) > t_env = StreamTableEnvironment.create(exec_env, > > environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build() > ) > > ... > > .field("rowtime", DataTypes.TIMESTAMP(3)) > .rowtime( > Rowtime() > .timestamps_from_field("time_st") > .watermarks_periodic_ascending()) > > ).create_temporary_table(INPUT_TABLE) > > > What is wrong with the code? I believe that I have already indicated which > attribute has to be treated as the time attribute. > > Thank you, > Manas >