flink 版本:1.14.4 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 水印是直接使用kafka 自带的时间戳生成watermark
以下是代码 ---interval join SingleOutputStreamOperator<HeaderFull> headerFullStream = headerFilterStream.keyBy(data -> data.getId()) .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) .between(Time.seconds(-10), Time.seconds(20)) .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() { @Override public void processElement(OrderHeader left, OrderItem right, Context context, Collector<HeaderFull> collector) throws Exception { HeaderFull headerFull = new HeaderFull(); BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); beanUtilsBean.copyProperties(headerFull, left); beanUtilsBean.copyProperties(headerFull, right); String event_date = left.getOrder_at().substring(0, 10); headerFull.setEvent_date(event_date); headerFull.setItem_id(right.getId()); collector.collect(headerFull); } } 使用sql 进行join Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15 s"); conf.setString("table.exec.mini-batch.size","100"); conf.setString("table.exec.state.ttl","20 s"); env.configure(conf); Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream); Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream); streamTableEnvironment.createTemporaryView("header",headerTable); streamTableEnvironment.createTemporaryView("item",itemTable); Table result = streamTableEnvironment.sqlQuery("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id"); DataStream<Row> rowDataStream = streamTableEnvironment.toChangelogStream(result); 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval join,为啥两者最终关联上的结果差异这么大。