@Roman - yes, I have the error if I do that.
@Xingbo Huang <hxbks...@gmail.com> - okay, I didn't know DDL was the more
recommended way.
Please let me know if you confirm that this is a bug.
Thanks!

On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <hxbks...@gmail.com> wrote:

> Hi Manas,
> Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which
> is the more recommended way
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>
> Best,
> Xingbo
>
> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月13日周一 下午7:23写道:
>
>> Hi Manas,
>>
>> Do you have the same error if you replace
>>
>>     .group_by("five_sec_window, monitorId") \
>>
>> with
>>
>>     .group_by("five_sec_window") \
>>
>> ?
>>
>> Regards,
>> Roman
>>
>>
>> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I have the following piece of code (for pyFlink v1.11) :
>>>
>>> t_env.from_path(INPUT_TABLE) \
>>>     .select("monitorId, data, rowtime") \
>>>     
>>> .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>>>     .group_by("five_sec_window, monitorId") \
>>>     .select("monitorId, data.avg, data.min, data.max, 
>>> five_sec_window.rowtime") \
>>>     .execute_insert(OUTPUT_TABLE)
>>>
>>> Which is generating the exception :
>>>
>>> Traceback (most recent call last):
>>>
>>>
>>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124,
>>> in <module>    .select("monitorId, data.avg, data.min, data.max,
>>> five_sec_window.rowtime") \*  File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
>>> line 907, in select
>>>     return Table(self._j_table.select(fields), self._t_env)
>>>   File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>> line 1286, in __call__
>>>     answer, self.gateway_client, self.target_id, self.name)
>>>   File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>> line 147, in deco
>>>     return f(*a, **kw)
>>>   File
>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>> line 328, in get_return_value
>>>     format(target_id, ".", name), value)
>>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>>>
>>> *: org.apache.flink.table.api.ValidationException: A group window
>>> expects a time attribute for grouping in a stream environment.*
>>>
>>> The "rowtime" attribute in INPUT_TABLE is created as :
>>>
>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> t_env = StreamTableEnvironment.create(exec_env,
>>>                                       
>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>                                       )
>>>
>>> ...
>>>
>>>      .field("rowtime", DataTypes.TIMESTAMP(3))
>>>         .rowtime(
>>>             Rowtime()
>>>             .timestamps_from_field("time_st")
>>>             .watermarks_periodic_ascending())
>>>
>>> ).create_temporary_table(INPUT_TABLE)
>>>
>>>
>>> What is wrong with the code? I believe that I have already indicated
>>> which attribute has to be treated as the time attribute.
>>>
>>> Thank you,
>>> Manas
>>>
>>

Reply via email to