Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-04-15 文章 HunterXHunter
有人知道这个bug吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 文章 HunterXHunter
试了 1.12.2,还是一样问题。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 文章 HunterXHunter
但是看情况好像是只有在:DataStream发生Keyby或者 setParallelism的时候才会发生



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-10 文章 HunterXHunter
1.12.1



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 文章 Leonard Xu
你好,
你的flink版本是多少?
之前有个bug是Table转datastream 会丢rowtime问题,看起来是这个问题。

我在[1]里修复了,你可以升级对应的版本试下。


祝好,
Leonard
[1]https://issues.apache.org/jira/browse/FLINK-21013 
 



> 在 2021年3月10日,14:34,HunterXHunter <1356469...@qq.com> 写道:
> 
> 再试了一下:
> 修改并行度也不行
>.setParallelism(9)
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 文章 HunterXHunter
再试了一下:
修改并行度也不行
.setParallelism(9)




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-09 文章 HunterXHunter
经过再一次验证:
即使我做group by rowtime的操作,
我对datastream做keyby(rowtime) 也有这个问题
例如:
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test "));
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
.keyby(_.f1)
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);

结果也是无法触发窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


BUG :DataStream 转 Table 后无法 触发窗口计算

2021-03-08 文章 HunterXHunter
1:当DataStream是由 一个table 经过 group by rowtime 转换过来的就无法触发窗口

例如:
 tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test group by msg,rowtime"));

// 获得 DataStream,并定义wtm生成
SingleOutputStreamOperator r =
tableEnv.toRetractStream(tableEnv.from("test3"), Row.class)
.filter(x -> x.f0)
// map 
.returns(Types.TUPLE(Types.STRING, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(((element,
recordTimestamp) -> element.f1))
);


参考 官方文档:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/time_attributes.html


// stream - 转 Table,指定Rowtime
tableEnv.createTemporaryView("test5",
r,
$("msg"),
$("rowtime").rowtime());

String sql5 = "select " +
"msg," +
"count(1) cnt" +
" from test5 " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
"";
tableEnv.executeSql("insert into printlnRetractSink " + sql5);


结果: 无法触发窗口操作。
查调试源码: org.apache.flink.table.runtime.operators.window.WindowOperator
// 返回的wtm永远都是 -9223372036854775808
public long getCurrentWatermark() {
return internalTimerService.currentWatermark();
}

//
查看任务,watermark是正常在生成的。InternalTimerServiceImpl.advanceWatermark是正常为currentWatermark赋值。但是
internalTimerService.currentWatermark() 却拿的是-9223372036854775808

// 当  tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select
msg,rowtime from test group by msg,rowtime"));
语句改为
tableEnv.createTemporaryView("test3", tableEnv.sqlQuery("select msg,rowtime
from test"));

结果就是正确的。
所以这是一个bug吗??








--
Sent from: http://apache-flink.147419.n8.nabble.com/