Re: Re: Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
你可以把 upsert kafka 想象成是 mysql 表的实时物化视图, 你在 mysql 里面 code 是 key,amount 是 value。当你把 amount 从0 更新成 100, 200。 那么最后的 sum(amount) 结果自然是 200。 如果你想要 0 -> 100 -> 300, 说明你不想把这个数据看成是有 pk 更新的数据,而是一条条独立的数据,这个时候你声明成 kafka connector,不定义 pk 即可,也就是当成普通 log 处理了。 关于你的 UDAF 的问题,估计是你实现的问题,因为你在 retract 方法中又把值设回

Re:Re: Re: retract stream UDAF使用问题

2020-12-09 文章 bulterman
假设Code X,第一条数据X.Amount=0,第二条数据X.Amount=100,第三条数据X.Amount=200 1、由于Code是主键,table中每次仅保留了第最新那条X的数据,因此select sum(X.Amount) from table的输出是 :0, 100, 200 2、我定义UDAF中,对于同一个Code X来说,在accumulate方法中每次都会执行acc.lastAmount = Amount去更新acc的状态,但从结果来看,对于同一个Code X,每一次进入方法acc.lastAmount都是0? 也是因为表中仅保留一条Code X的数据的关系吗?

Re: Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
因为你的 upsert kafka table 的 pk 是 code,所以 code 分组下,数据已经是唯一的了 (一个 code 下,只有一行,取最后一行作为最新数据)。估计你同样的 code 下,amount 值是一样的,所以 sum(amount) 自然不会变化。 Best, Jark On Thu, 10 Dec 2020 at 12:36, bulterman <15618338...@163.com> wrote: > // kafka table > tableEnv.execuetSql("CREATE TABLE market_stock(\n" + > >

Re: retract stream UDAF使用问题

2020-12-09 文章 Jark Wu
可以发下代码吗? On Thu, 10 Dec 2020 at 11:19, bulterman <15618338...@163.com> wrote: > 上游是upsert-kafka connector 创建的table, > 使用UDAF时发现accumlator里的变量一直没被更新?如果使用kafka connector是正常更新的 > (为了测试方便,table里只有同一个PK的数据)