Hi Manas,

Do you have the same error if you replace

    .group_by("five_sec_window, monitorId") \

with

    .group_by("five_sec_window") \

?

Regards,
Roman


On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com> wrote:

> Hi,
> I have the following piece of code (for pyFlink v1.11) :
>
> t_env.from_path(INPUT_TABLE) \
>     .select("monitorId, data, rowtime") \
>     .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>     .group_by("five_sec_window, monitorId") \
>     .select("monitorId, data.avg, data.min, data.max, 
> five_sec_window.rowtime") \
>     .execute_insert(OUTPUT_TABLE)
>
> Which is generating the exception :
>
> Traceback (most recent call last):
>
>
> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124, in
> <module>    .select("monitorId, data.avg, data.min, data.max,
> five_sec_window.rowtime") \*  File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
> line 907, in select
>     return Table(self._j_table.select(fields), self._t_env)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 147, in deco
>     return f(*a, **kw)
>   File
> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
> line 328, in get_return_value
>     format(target_id, ".", name), value)
> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>
> *: org.apache.flink.table.api.ValidationException: A group window expects
> a time attribute for grouping in a stream environment.*
>
> The "rowtime" attribute in INPUT_TABLE is created as :
>
> exec_env = StreamExecutionEnvironment.get_execution_environment()
> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> t_env = StreamTableEnvironment.create(exec_env,
>                                       
> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>                                       )
>
> ...
>
>      .field("rowtime", DataTypes.TIMESTAMP(3))
>         .rowtime(
>             Rowtime()
>             .timestamps_from_field("time_st")
>             .watermarks_periodic_ascending())
>
> ).create_temporary_table(INPUT_TABLE)
>
>
> What is wrong with the code? I believe that I have already indicated which
> attribute has to be treated as the time attribute.
>
> Thank you,
> Manas
>

Reply via email to