你好,能提供下具体的 plan 供大家查看下吗? 你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN <YOUR_QUERY>").print() 打印下相关的信息。
Best, Shengkai lxk <lxk7...@163.com> 于2022年6月10日周五 10:29写道: > flink 版本:1.14.4 > 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval > join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 > 水印是直接使用kafka 自带的时间戳生成watermark > > > 以下是代码 ---interval join > > SingleOutputStreamOperator<HeaderFull> headerFullStream = > headerFilterStream.keyBy(data -> data.getId()) > .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) > .between(Time.seconds(-10), Time.seconds(20)) > .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() { > @Override > public void processElement(OrderHeader left, OrderItem right, Context > context, Collector<HeaderFull> collector) throws Exception { > HeaderFull headerFull = new HeaderFull(); > BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); > beanUtilsBean.copyProperties(headerFull, left); > beanUtilsBean.copyProperties(headerFull, right); > String event_date = left.getOrder_at().substring(0, 10); > headerFull.setEvent_date(event_date); > headerFull.setItem_id(right.getId()); > collector.collect(headerFull); > } > } > 使用sql 进行join > Configuration conf = new Configuration(); > conf.setString("table.exec.mini-batch.enabled","true"); > conf.setString("table.exec.mini-batch.allow-latency","15 s"); > conf.setString("table.exec.mini-batch.size","100"); > conf.setString("table.exec.state.ttl","20 s"); > env.configure(conf); > Table headerTable = > streamTableEnvironment.fromDataStream(headerFilterStream); > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream); > > > streamTableEnvironment.createTemporaryView("header",headerTable); > streamTableEnvironment.createTemporaryView("item",itemTable); > > Table result = streamTableEnvironment.sqlQuery("select header.customer_id" > + > ",item.goods_id" + > ",header.id" + > ",header.order_status" + > ",header.shop_id" + > ",header.parent_order_id" + > ",header.order_at" + > ",header.pay_at" + > ",header.channel_id" + > ",header.root_order_id" + > ",item.id" + > ",item.row_num" + > ",item.p_sp_sub_amt" + > ",item.display_qty" + > ",item.qty" + > ",item.bom_type" + > " from header JOIN item on header.id = item.order_id"); > > > DataStream<Row> rowDataStream = > streamTableEnvironment.toChangelogStream(result); > 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval > join,为啥两者最终关联上的结果差异这么大。 > > > > > > > > > > >