flink 版本:1.14.4
目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql 
api,直接进行join,数据是正常的,没有丢失。
水印是直接使用kafka 自带的时间戳生成watermark


以下是代码 ---interval join 

SingleOutputStreamOperator<HeaderFull> headerFullStream = 
headerFilterStream.keyBy(data -> data.getId())
.intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
.between(Time.seconds(-10), Time.seconds(20))
.process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() {
@Override
public void processElement(OrderHeader left, OrderItem right, Context context, 
Collector<HeaderFull> collector) throws Exception {
HeaderFull headerFull = new HeaderFull();
BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
beanUtilsBean.copyProperties(headerFull, left);
beanUtilsBean.copyProperties(headerFull, right);
String event_date = left.getOrder_at().substring(0, 10);
headerFull.setEvent_date(event_date);
headerFull.setItem_id(right.getId());
collector.collect(headerFull);
}
        }
使用sql 进行join
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15 s");
conf.setString("table.exec.mini-batch.size","100");
conf.setString("table.exec.state.ttl","20 s");
env.configure(conf);
Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream);
Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream);


streamTableEnvironment.createTemporaryView("header",headerTable);
streamTableEnvironment.createTemporaryView("item",itemTable);

Table result = streamTableEnvironment.sqlQuery("select header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id");


DataStream<Row> rowDataStream = 
streamTableEnvironment.toChangelogStream(result);
不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval 
join,为啥两者最终关联上的结果差异这么大。










回复