你好,能提供下具体的 plan 供大家查看下吗?

你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
<YOUR_QUERY>").print() 打印下相关的信息。

Best,
Shengkai

lxk <lxk7...@163.com> 于2022年6月10日周五 10:29写道:

> flink 版本:1.14.4
> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
> 水印是直接使用kafka 自带的时间戳生成watermark
>
>
> 以下是代码 ---interval join
>
> SingleOutputStreamOperator<HeaderFull> headerFullStream =
> headerFilterStream.keyBy(data -> data.getId())
> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
> .between(Time.seconds(-10), Time.seconds(20))
> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() {
> @Override
> public void processElement(OrderHeader left, OrderItem right, Context
> context, Collector<HeaderFull> collector) throws Exception {
> HeaderFull headerFull = new HeaderFull();
> BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
> beanUtilsBean.copyProperties(headerFull, left);
> beanUtilsBean.copyProperties(headerFull, right);
> String event_date = left.getOrder_at().substring(0, 10);
> headerFull.setEvent_date(event_date);
> headerFull.setItem_id(right.getId());
> collector.collect(headerFull);
> }
>         }
> 使用sql 进行join
> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15 s");
> conf.setString("table.exec.mini-batch.size","100");
> conf.setString("table.exec.state.ttl","20 s");
> env.configure(conf);
> Table headerTable =
> streamTableEnvironment.fromDataStream(headerFilterStream);
> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream);
>
>
> streamTableEnvironment.createTemporaryView("header",headerTable);
> streamTableEnvironment.createTemporaryView("item",itemTable);
>
> Table result = streamTableEnvironment.sqlQuery("select header.customer_id"
> +
> ",item.goods_id" +
> ",header.id" +
> ",header.order_status" +
> ",header.shop_id" +
> ",header.parent_order_id" +
> ",header.order_at" +
> ",header.pay_at" +
> ",header.channel_id" +
> ",header.root_order_id" +
> ",item.id" +
> ",item.row_num" +
> ",item.p_sp_sub_amt" +
> ",item.display_qty" +
> ",item.qty" +
> ",item.bom_type" +
> " from header JOIN item on header.id = item.order_id");
>
>
> DataStream<Row> rowDataStream =
> streamTableEnvironment.toChangelogStream(result);
> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval
> join,为啥两者最终关联上的结果差异这么大。
>
>
>
>
>
>
>
>
>
>
>

回复