谢谢两位大佬的回复。
还剩下两个小问题,
①请问输出结果里面的"+"是什么意思,不应该是2017吗?
1>
(true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)
8>
(true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)
4>
(true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)
最后的999是啥意思???
②代码中有两处时间限定
.window(Tumble.over(lit(5).minutes())
$("user"),
$("w").start(),//输出很奇怪
$("w").end(),
$("w").rowtime(),
请问这两处时间限定有什么区别吗?
是不是
前者是"全局范围限定"?
后者是在前者限定的基础上做进一步限定?
如果是的话,end里面是否可以设定时间戳?
----------------------------------------------------------------------
昨天的完整代码是:
https://paste.ubuntu.com/p/9JsFDKC5V8/
谢谢谢谢~!!!
在 2020-12-10 12:02:31,"Leonard Xu" <[email protected]> 写道:
>Hi,
>补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>
>给出文档中省略的watermark生成部分code:
>
> // 老版本
>// Table orders =
>tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new
>AscendingTimestampExtractor<Order>() {
>// @Override
>// public long extractAscendingTimestamp(Order element) {
>// return element.rowtime;
>// }
>// }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
> // 新版本
> Table orders =
> tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx)
> -> new WatermarkGenerator<Order>() {
> @Override
> public void onEvent(Order order, long eventTimestamp,
> WatermarkOutput watermarkOutput) {
> watermarkOutput.emitWatermark(new
> Watermark(eventTimestamp));
> }
> @Override
> public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
> }
> }))
> , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>如果代码不多,可以直接贴在邮件中哈。
>
>
>祝好,
>Leonard
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>
>> 在 2020年12月10日,11:10,Jark Wu <[email protected]> 写道:
>>
>> 链接错了。重发下。
>>
>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>> 表。这一行应该执行不成功把。
>>
>> Best,
>> Jark
>>
>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[email protected]> wrote:
>>
>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>> 表。这一行应该执行不成功把。
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[email protected]> wrote:
>>>
>>>> 代码是:
>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>> 报错:
>>>> A group window expects a time attribute for grouping in a stream
>>>> environment.
>>>> 但是代码的数据源中已经有时间属性了.
>>>> 请问应该怎么修改代码?
>>>> 谢谢
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>