sql如下:
with effective_chargeorder as (
select 
o.recordcreatedtime,o.recordcreateduser,o.status,o._is_delete,o.appointmentid,o.id,o.tenantid,o.actualprice,o.proc_time
 from t_k_chargeorder as o
where o.recordcreateduser > 0 and o.status NOT IN ( '已作废', '未收费', '作废并撤回', 
'等待支付宝付费', '等待微信付费' ) 
and o._is_delete = '0' and o.appointmentid > 0
)
--select * from effective_chargeorder;  
SELECT window_start, window_end, SUM(actualprice)
  FROM TABLE(
    CUMULATE(TABLE effective_chargeorder, DESCRIPTOR(proc_time), INTERVAL '2' 
MINUTES, INTERVAL '10' MINUTES))
  GROUP BY window_start, window_end;


IDE报如下错误:
StreamPhysicalWindowAggregate doesn't support consuming update and delete 
changes which is produced by node ChangelogNormalize(key=[id])
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:394)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:353)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:342)
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:341)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)


消费的是upsertkafka的source,由于对planner这层了解不深,不是很看的懂,请各位大佬指点~










 

回复