你可以把 upsert kafka 想象成是 mysql 表的实时物化视图,
你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。
那么最后的 sum(amount) 结果自然是 200。
如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka
connector,不定义 pk 即可,也就是当成普通 log 处理了。
关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回
假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200
1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0,
100, 200
2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount =
Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code
X的数据的关系吗?
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code
下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。
Best,
Jark
On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote:
> // kafka table
> tableEnv.execuetSql("CREATE TABLE market_stock(\n" +
>
>
可以发下代码吗?
On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote:
> 上游是upsert-kafka connector 创建的table,
> 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的
> (为了测试方便,table里只有同一个PK的数据)