Hi! window tvf 目前不支持消费 changelog,也就是说只能消费 insert 数据。upsert-kafka source 是一个会产生 changelog 的 source,因此下游不能接 window tvf。
赵旭晨 <jjrr...@163.com> 于2022年3月1日周二 15:23写道: > sql如下: > with effective_chargeorder as ( > select > o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid, > o.id,o.tenantid,o.actualprice,o.proc_time from t_k_chargeorder as o > where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', > '等待支付宝付费', '等待微信付费' ) > and o._is_delete = '0' and o.appointmentid > 0 > ) > --select * from effective_chargeorder; > SELECT window_start, window_end, SUM(actualprice) > FROM TABLE( > CUMULATE(TABLE effective_chargeorder, DESCRIPTOR(proc_time), INTERVAL > '2' MINUTES, INTERVAL '10' MINUTES)) > GROUP BY window_start, window_end; > > IDE报如下错误: > StreamPhysicalWindowAggregate doesn't support consuming update and delete > changes which is produced by node ChangelogNormalize(key=[id]) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:394) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:353) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:341) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > > 消费的是upsertkafka的source,由于对planner这层了解不深,不是很看的懂,请各位大佬指点~ > > > > > > > > > > >