谢谢两位大佬的回复。
还剩下两个小问题,
①请问输出结果里面的"+"是什么意思,不应该是2017吗?
1> 
(true,3,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,4)
8> 
(true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,6)
4> 
(true,1,+1705471-09-26T16:50,+1705471-09-26T16:55,+1705471-09-26T16:54:59.999,8)
最后的999是啥意思???


②代码中有两处时间限定
.window(Tumble.over(lit(5).minutes())
$("user"),
$("w").start(),//输出很奇怪
$("w").end(),
$("w").rowtime(),
请问这两处时间限定有什么区别吗?
是不是
前者是"全局范围限定"?
后者是在前者限定的基础上做进一步限定?
如果是的话,end里面是否可以设定时间戳?
----------------------------------------------------------------------
昨天的完整代码是:
https://paste.ubuntu.com/p/9JsFDKC5V8/


谢谢谢谢~!!!











在 2020-12-10 12:02:31,"Leonard Xu" <[email protected]> 写道:
>Hi,
>补充下昨天线下提供的答疑,在从datastream 转换到Table时,如果希望转换后的Table上能够继续使用watermark,
>需要(1)让datastream中包含watermark (2)在table上声明event time 属性. 文档可以参考[1]
>
>给出文档中省略的watermark生成部分code:
>
>        // 老版本
>//        Table orders = 
>tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(new 
>AscendingTimestampExtractor<Order>() {
>//                @Override
>//                public long extractAscendingTimestamp(Order element) {
>//                    return element.rowtime;
>//                }
>//            }), $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>        // 新版本
>        Table orders = 
> tEnv.fromDataStream(orderA.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator((ctx)
>  -> new WatermarkGenerator<Order>() {
>                @Override
>                public void onEvent(Order order, long eventTimestamp, 
> WatermarkOutput watermarkOutput) {
>                    watermarkOutput.emitWatermark(new 
> Watermark(eventTimestamp));
>                }
>                @Override
>                public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
>                }
>            }))
>            , $("user"), $("product"), $("amount"),$("rowtime").rowtime());
>如果代码不多,可以直接贴在邮件中哈。
>
>
>祝好,
>Leonard
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/table/streaming/time_attributes.html#%E5%9C%A8-datastream-%E5%88%B0-table-%E8%BD%AC%E6%8D%A2%E6%97%B6%E5%AE%9A%E4%B9%89-1
>
>> 在 2020年12月10日,11:10,Jark Wu <[email protected]> 写道:
>> 
>> 链接错了。重发下。
>> 
>> 1) 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html
>> <https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html>
>> 2) 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>> 表。这一行应该执行不成功把。
>> 
>> Best,
>> Jark
>> 
>> On Thu, 10 Dec 2020 at 11:09, Jark Wu <[email protected]> wrote:
>> 
>>> 1). 所谓时间属性,不是指 timestamp 类型的字段,而是一个特殊概念。可以看下文档如果声明时间属性:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1).12/dev/table/streaming/time_attributes.html
>>> 2. 你的代码好像也不对。 L45: Table orders = tEnv.from("Orders"); 没看到你有注册过 "Orders"
>>> 表。这一行应该执行不成功把。
>>> 
>>> Best,
>>> Jark
>>> 
>>> On Wed, 9 Dec 2020 at 15:44, Appleyuchi <[email protected]> wrote:
>>> 
>>>> 代码是:
>>>> https://paste.ubuntu.com/p/gVGrj2V7ZF/
>>>> 报错:
>>>> A group window expects a time attribute for grouping in a stream
>>>> environment.
>>>> 但是代码的数据源中已经有时间属性了.
>>>> 请问应该怎么修改代码?
>>>> 谢谢
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>

回复