现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题
Table headerTable =
streamTableEnvironment.fromDataStream(headerFilterStream, Schema.newBuilder()
.columnByExpression("rowtime",
"CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
.build());
Table itemTable =
streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder()
.columnByExpression("rowtime",
"CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))")
.watermark("rowtime", "rowtime - INTERVAL '2' SECOND")
.build());
streamTableEnvironment.createTemporaryView("header",headerTable);
streamTableEnvironment.createTemporaryView("item",itemTable);
Table result = streamTableEnvironment.sqlQuery("select
header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id and
item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND header.rowtime +
INTERVAL '20' SECOND");
String intervalJoin = streamTableEnvironment.explainSql("select
header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id and
item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND header.rowtime +
INTERVAL '20' SECOND");
System.out.println(intervalJoin);
DataStream<Row> rowDataStream =
streamTableEnvironment.toChangelogStream(result);
执行计划:
== Abstract Syntax Tree ==
LogicalProject(customer_id=[$2], goods_id=[$16], id=[$0], order_status=[$1],
shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6],
channel_id=[$7], root_order_id=[$8], id0=[$13], row_num=[$15],
p_sp_sub_amt=[$20], display_qty=[$23], qty=[$18], bom_type=[$21])
+- LogicalJoin(condition=[AND(=($0, $14), >=($25, -($12, 10000:INTERVAL
SECOND)), <=($25, +($12, 20000:INTERVAL SECOND)))], joinType=[inner])
:- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12,
2000:INTERVAL SECOND)])
: +- LogicalProject(id=[$0], order_status=[$1], customer_id=[$2],
shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6],
channel_id=[$7], root_order_id=[$8], last_updated_at=[$9], business_flag=[$10],
mysql_op_type=[$11], rowtime=[CAST(SUBSTRING($9, 0,
19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
: +- LogicalTableScan(table=[[default_catalog, default_database,
Unregistered_DataStream_Source_5]])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($12,
2000:INTERVAL SECOND)])
+- LogicalProject(id=[$0], order_id=[$1], row_num=[$2], goods_id=[$3],
s_sku_code=[$4], qty=[$5], p_paid_sub_amt=[$6], p_sp_sub_amt=[$7],
bom_type=[$8], last_updated_at=[$9], display_qty=[$10], is_first_flag=[$11],
rowtime=[CAST(SUBSTRING($9, 0, 19)):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)])
+- LogicalTableScan(table=[[default_catalog, default_database,
Unregistered_DataStream_Source_8]])
== Optimized Physical Plan ==
Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id,
order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt,
display_qty, qty, bom_type])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-20000, leftUpperBound=10000, leftTimeIndex=9,
rightTimeIndex=8], where=[AND(=(id, order_id), >=(rowtime0, -(rowtime,
10000:INTERVAL SECOND)), <=(rowtime0, +(rowtime, 20000:INTERVAL SECOND)))],
select=[id, order_status, customer_id, shop_id, parent_order_id, order_at,
pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, goods_id,
qty, p_sp_sub_amt, bom_type, display_qty, rowtime0])
:- Exchange(distribution=[hash[id]])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
2000:INTERVAL SECOND)])
: +- Calc(select=[id, order_status, customer_id, shop_id,
parent_order_id, order_at, pay_at, channel_id, root_order_id,
CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id,
shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id,
last_updated_at, business_flag, mysql_op_type])
+- Exchange(distribution=[hash[order_id]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime,
2000:INTERVAL SECOND)])
+- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt,
bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime])
+- TableSourceScan(table=[[default_catalog, default_database,
Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id,
s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at,
display_qty, is_first_flag])
== Optimized Execution Plan ==
Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id,
order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt,
display_qty, qty, bom_type])
+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true,
leftLowerBound=-20000, leftUpperBound=10000, leftTimeIndex=9,
rightTimeIndex=8], where=[((id = order_id) AND (rowtime0 >= (rowtime -
10000:INTERVAL SECOND)) AND (rowtime0 <= (rowtime + 20000:INTERVAL SECOND)))],
select=[id, order_status, customer_id, shop_id, parent_order_id, order_at,
pay_at, channel_id, root_order_id, rowtime, id0, order_id, row_num, goods_id,
qty, p_sp_sub_amt, bom_type, display_qty, rowtime0])
:- Exchange(distribution=[hash[id]])
: +- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
2000:INTERVAL SECOND)])
: +- Calc(select=[id, order_status, customer_id, shop_id,
parent_order_id, order_at, pay_at, channel_id, root_order_id,
CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime])
: +- TableSourceScan(table=[[default_catalog, default_database,
Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id,
shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id,
last_updated_at, business_flag, mysql_op_type])
+- Exchange(distribution=[hash[order_id]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime -
2000:INTERVAL SECOND)])
+- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt,
bom_type, display_qty, CAST(SUBSTRING(last_updated_at, 0, 19)) AS rowtime])
+- TableSourceScan(table=[[default_catalog, default_database,
Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id,
s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at,
display_qty, is_first_flag])
在 2022-06-10 17:16:31,"lxk" <lxk7...@163.com> 写道:
>使用sql 进行interval
>join,我目前的问题是感觉时间转换这块不太友好,我目前流里面的事件时间字段是string类型,数据样式是2022-06-10
>13:08:55,但是我使用TO_TIMESTAMP这个函数进行转换一直报错
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-06-10 15:04:31,"Xuyang" <xyzhong...@163.com> 写道:
>>Hi, datastream的这个interval join的api应该对标的是sql中的interval
>>join。但是你目前写的这个sql,是普通join。普通join和interval
>>join在业务含义和实现上都是有区别的。所以你直接拿datastream api的interval
>>join和sql上的普通join结果对比,其实是有问题的。所以我之前的建议是让你试下让sql也使用interval join,这样双方才有可比性。
>>
>>
>>另外sql中设置的table.exec.state.ttl这个参数,只是代表的state会20s清空过期数据,但我看你要比较的时间窗口是-10s和20s,貌似也不大一样。
>>
>>
>>
>>
>>--
>>
>> Best!
>> Xuyang
>>
>>
>>
>>
>>
>>在 2022-06-10 14:33:37,"lxk" <lxk7...@163.com> 写道:
>>>
>>>
>>>
>>>我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据?
>>>
>>>sql中我设置这个table.exec.state.ttl参数
>>>为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2022-06-10 14:15:29,"Xuyang" <xyzhong...@163.com> 写道:
>>>>Hi, 你的这条SQL 并不是interval join,是普通join。
>>>>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval
>>>>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。
>>>>
>>>>
>>>>
>>>>
>>>>[1]
>>>>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>--
>>>>
>>>> Best!
>>>> Xuyang
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>在 2022-06-10 11:26:33,"lxk" <lxk7...@163.com> 写道:
>>>>>我用的是以下代码:
>>>>>String s = streamTableEnvironment.explainSql("select header.customer_id" +
>>>>>",item.goods_id" +
>>>>>",header.id" +
>>>>>",header.order_status" +
>>>>>",header.shop_id" +
>>>>>",header.parent_order_id" +
>>>>>",header.order_at" +
>>>>>",header.pay_at" +
>>>>>",header.channel_id" +
>>>>>",header.root_order_id" +
>>>>>",item.id" +
>>>>>",item.row_num" +
>>>>>",item.p_sp_sub_amt" +
>>>>>",item.display_qty" +
>>>>>",item.qty" +
>>>>>",item.bom_type" +
>>>>>" from header JOIN item on header.id = item.order_id");
>>>>>
>>>>>System.out.println("explain:" + s);
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>plan信息为:
>>>>>explain:== Abstract Syntax Tree ==
>>>>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0],
>>>>>order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5],
>>>>>pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12],
>>>>>row_num=[$14], p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17],
>>>>>bom_type=[$20])
>>>>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
>>>>> :- LogicalTableScan(table=[[default_catalog, default_database,
>>>>> Unregistered_DataStream_Source_5]])
>>>>> +- LogicalTableScan(table=[[default_catalog, default_database,
>>>>> Unregistered_DataStream_Source_8]])
>>>>>
>>>>>
>>>>>== Optimized Physical Plan ==
>>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id,
>>>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0,
>>>>>row_num, p_sp_sub_amt, display_qty, qty, bom_type])
>>>>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id,
>>>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at,
>>>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty,
>>>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey],
>>>>>rightInputSpec=[NoUniqueKey])
>>>>> :- Exchange(distribution=[hash[id]])
>>>>> : +- Calc(select=[id, order_status, customer_id, shop_id,
>>>>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>>>>> : +- TableSourceScan(table=[[default_catalog, default_database,
>>>>> Unregistered_DataStream_Source_5]], fields=[id, order_status,
>>>>> customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id,
>>>>> root_order_id, last_updated_at, business_flag, mysql_op_type])
>>>>> +- Exchange(distribution=[hash[order_id]])
>>>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt,
>>>>> bom_type, display_qty])
>>>>> +- TableSourceScan(table=[[default_catalog, default_database,
>>>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num,
>>>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type,
>>>>> last_updated_at, display_qty, is_first_flag])
>>>>>
>>>>>
>>>>>== Optimized Execution Plan ==
>>>>>Calc(select=[customer_id, goods_id, id, order_status, shop_id,
>>>>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0,
>>>>>row_num, p_sp_sub_amt, display_qty, qty, bom_type])
>>>>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id,
>>>>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at,
>>>>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty,
>>>>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey],
>>>>>rightInputSpec=[NoUniqueKey])
>>>>> :- Exchange(distribution=[hash[id]])
>>>>> : +- Calc(select=[id, order_status, customer_id, shop_id,
>>>>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>>>>> : +- TableSourceScan(table=[[default_catalog, default_database,
>>>>> Unregistered_DataStream_Source_5]], fields=[id, order_status,
>>>>> customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id,
>>>>> root_order_id, last_updated_at, business_flag, mysql_op_type])
>>>>> +- Exchange(distribution=[hash[order_id]])
>>>>> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt,
>>>>> bom_type, display_qty])
>>>>> +- TableSourceScan(table=[[default_catalog, default_database,
>>>>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num,
>>>>> goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type,
>>>>> last_updated_at, display_qty, is_first_flag])
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>在 2022-06-10 11:02:56,"Shengkai Fang" <fskm...@gmail.com> 写道:
>>>>>>你好,能提供下具体的 plan 供大家查看下吗?
>>>>>>
>>>>>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
>>>>>><YOUR_QUERY>").print() 打印下相关的信息。
>>>>>>
>>>>>>Best,
>>>>>>Shengkai
>>>>>>
>>>>>>lxk <lxk7...@163.com> 于2022年6月10日周五 10:29写道:
>>>>>>
>>>>>>> flink 版本:1.14.4
>>>>>>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
>>>>>>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
>>>>>>> 水印是直接使用kafka 自带的时间戳生成watermark
>>>>>>>
>>>>>>>
>>>>>>> 以下是代码 ---interval join
>>>>>>>
>>>>>>> SingleOutputStreamOperator<HeaderFull> headerFullStream =
>>>>>>> headerFilterStream.keyBy(data -> data.getId())
>>>>>>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
>>>>>>> .between(Time.seconds(-10), Time.seconds(20))
>>>>>>> .process(new ProcessJoinFunction<OrderHeader, OrderItem, HeaderFull>() {
>>>>>>> @Override
>>>>>>> public void processElement(OrderHeader left, OrderItem right, Context
>>>>>>> context, Collector<HeaderFull> collector) throws Exception {
>>>>>>> HeaderFull headerFull = new HeaderFull();
>>>>>>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
>>>>>>> beanUtilsBean.copyProperties(headerFull, left);
>>>>>>> beanUtilsBean.copyProperties(headerFull, right);
>>>>>>> String event_date = left.getOrder_at().substring(0, 10);
>>>>>>> headerFull.setEvent_date(event_date);
>>>>>>> headerFull.setItem_id(right.getId());
>>>>>>> collector.collect(headerFull);
>>>>>>> }
>>>>>>> }
>>>>>>> 使用sql 进行join
>>>>>>> Configuration conf = new Configuration();
>>>>>>> conf.setString("table.exec.mini-batch.enabled","true");
>>>>>>> conf.setString("table.exec.mini-batch.allow-latency","15 s");
>>>>>>> conf.setString("table.exec.mini-batch.size","100");
>>>>>>> conf.setString("table.exec.state.ttl","20 s");
>>>>>>> env.configure(conf);
>>>>>>> Table headerTable =
>>>>>>> streamTableEnvironment.fromDataStream(headerFilterStream);
>>>>>>> Table itemTable =
>>>>>>> streamTableEnvironment.fromDataStream(filterItemStream);
>>>>>>>
>>>>>>>
>>>>>>> streamTableEnvironment.createTemporaryView("header",headerTable);
>>>>>>> streamTableEnvironment.createTemporaryView("item",itemTable);
>>>>>>>
>>>>>>> Table result = streamTableEnvironment.sqlQuery("select
>>>>>>> header.customer_id"
>>>>>>> +
>>>>>>> ",item.goods_id" +
>>>>>>> ",header.id" +
>>>>>>> ",header.order_status" +
>>>>>>> ",header.shop_id" +
>>>>>>> ",header.parent_order_id" +
>>>>>>> ",header.order_at" +
>>>>>>> ",header.pay_at" +
>>>>>>> ",header.channel_id" +
>>>>>>> ",header.root_order_id" +
>>>>>>> ",item.id" +
>>>>>>> ",item.row_num" +
>>>>>>> ",item.p_sp_sub_amt" +
>>>>>>> ",item.display_qty" +
>>>>>>> ",item.qty" +
>>>>>>> ",item.bom_type" +
>>>>>>> " from header JOIN item on header.id = item.order_id");
>>>>>>>
>>>>>>>
>>>>>>> DataStream<Row> rowDataStream =
>>>>>>> streamTableEnvironment.toChangelogStream(result);
>>>>>>> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval
>>>>>>> join,为啥两者最终关联上的结果差异这么大。
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>