Hi Manas,

Yes, this is a bug which I have also encountered in the Descriptor API, but
I don't found the corresponding issue. You can create an issue to report
this problem. There are similar bugs in the current descriptor API, so DDL
is more recommended way. Now the community has started a discussion on
refactoring the Descriptor API[1]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-129-Refactor-Descriptor-API-to-register-connector-in-Table-API-tt42995.html

Best,
Xingbo

Manas Kale <manaskal...@gmail.com> 于2020年7月14日周二 下午12:50写道:

> @Roman - yes, I have the error if I do that.
> @Xingbo Huang <hxbks...@gmail.com> - okay, I didn't know DDL was the more
> recommended way.
> Please let me know if you confirm that this is a bug.
> Thanks!
>
> On Mon, Jul 13, 2020 at 5:07 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>
>> Hi Manas,
>> Maybe it is the bug of Java Descriptor. You can try the DDL[1] way which
>> is the more recommended way
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/
>>
>> Best,
>> Xingbo
>>
>> Khachatryan Roman <khachatryan.ro...@gmail.com> 于2020年7月13日周一 下午7:23写道:
>>
>>> Hi Manas,
>>>
>>> Do you have the same error if you replace
>>>
>>>     .group_by("five_sec_window, monitorId") \
>>>
>>> with
>>>
>>>     .group_by("five_sec_window") \
>>>
>>> ?
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Mon, Jul 13, 2020 at 11:16 AM Manas Kale <manaskal...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I have the following piece of code (for pyFlink v1.11) :
>>>>
>>>> t_env.from_path(INPUT_TABLE) \
>>>>     .select("monitorId, data, rowtime") \
>>>>     
>>>> .window(Tumble.over("5.minutes").on("rowtime").alias("five_sec_window")) \
>>>>     .group_by("five_sec_window, monitorId") \
>>>>     .select("monitorId, data.avg, data.min, data.max, 
>>>> five_sec_window.rowtime") \
>>>>     .execute_insert(OUTPUT_TABLE)
>>>>
>>>> Which is generating the exception :
>>>>
>>>> Traceback (most recent call last):
>>>>
>>>>
>>>> * File "/home/manas/IU_workspace/Flink_POC/pyflink/main.py", line 124,
>>>> in <module>    .select("monitorId, data.avg, data.min, data.max,
>>>> five_sec_window.rowtime") \*  File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/table/table.py",
>>>> line 907, in select
>>>>     return Table(self._j_table.select(fields), self._t_env)
>>>>   File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/java_gateway.py",
>>>> line 1286, in __call__
>>>>     answer, self.gateway_client, self.target_id, self.name)
>>>>   File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>>>> line 147, in deco
>>>>     return f(*a, **kw)
>>>>   File
>>>> "/home/manas/anaconda3/envs/flink_env/lib/python3.7/site-packages/py4j/protocol.py",
>>>> line 328, in get_return_value
>>>>     format(target_id, ".", name), value)
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling o87.select.
>>>>
>>>> *: org.apache.flink.table.api.ValidationException: A group window
>>>> expects a time attribute for grouping in a stream environment.*
>>>>
>>>> The "rowtime" attribute in INPUT_TABLE is created as :
>>>>
>>>> exec_env = StreamExecutionEnvironment.get_execution_environment()
>>>> exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>>> t_env = StreamTableEnvironment.create(exec_env,
>>>>                                       
>>>> environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
>>>>                                       )
>>>>
>>>> ...
>>>>
>>>>      .field("rowtime", DataTypes.TIMESTAMP(3))
>>>>         .rowtime(
>>>>             Rowtime()
>>>>             .timestamps_from_field("time_st")
>>>>             .watermarks_periodic_ascending())
>>>>
>>>> ).create_temporary_table(INPUT_TABLE)
>>>>
>>>>
>>>> What is wrong with the code? I believe that I have already indicated
>>>> which attribute has to be treated as the time attribute.
>>>>
>>>> Thank you,
>>>> Manas
>>>>
>>>

Reply via email to