Re:Re:Re:Re: Flink 使用interval join数据丢失疑问

2022-06-09 文章 lxk



我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据?

sql中我设置这个table.exec.state.ttl参数 
为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。














在 2022-06-10 14:15:29,"Xuyang"  写道:
>Hi, 你的这条SQL 并不是interval join,是普通join。
>interval join的使用文档可以参考文档[1]。可以试下使用SQL interval 
>join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。
>
>
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins
>
>
>
>
>
>
>
>--
>
>Best!
>Xuyang
>
>
>
>
>
>在 2022-06-10 11:26:33,"lxk"  写道:
>>我用的是以下代码:
>>String s = streamTableEnvironment.explainSql("select header.customer_id" +
>>",item.goods_id" +
>>",header.id" +
>>",header.order_status" +
>>",header.shop_id" +
>>",header.parent_order_id" +
>>",header.order_at" +
>>",header.pay_at" +
>>",header.channel_id" +
>>",header.root_order_id" +
>>",item.id" +
>>",item.row_num" +
>>",item.p_sp_sub_amt" +
>>",item.display_qty" +
>>",item.qty" +
>>",item.bom_type" +
>>" from header JOIN item on header.id = item.order_id");
>>
>>System.out.println("explain:" + s);
>>
>>
>>
>>
>>plan信息为:
>>explain:== Abstract Syntax Tree ==
>>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], 
>>shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
>>channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], 
>>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20])
>>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
>>   :- LogicalTableScan(table=[[default_catalog, default_database, 
>> Unregistered_DataStream_Source_5]])
>>   +- LogicalTableScan(table=[[default_catalog, default_database, 
>> Unregistered_DataStream_Source_8]])
>>
>>
>>== Optimized Physical Plan ==
>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, 
>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, 
>>p_sp_sub_amt, display_qty, qty, bom_type])
>>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, 
>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, 
>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
>>rightInputSpec=[NoUniqueKey])
>>   :- Exchange(distribution=[hash[id]])
>>   :  +- Calc(select=[id, order_status, customer_id, shop_id, 
>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>>   : +- TableSourceScan(table=[[default_catalog, default_database, 
>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
>> last_updated_at, business_flag, mysql_op_type])
>>   +- Exchange(distribution=[hash[order_id]])
>>  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
>> bom_type, display_qty])
>> +- TableSourceScan(table=[[default_catalog, default_database, 
>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
>> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
>> display_qty, is_first_flag])
>>
>>
>>== Optimized Execution Plan ==
>>Calc(select=[customer_id, goods_id, id, order_status, shop_id, 
>>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, 
>>p_sp_sub_amt, display_qty, qty, bom_type])
>>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, 
>>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
>>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, 
>>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
>>rightInputSpec=[NoUniqueKey])
>>   :- Exchange(distribution=[hash[id]])
>>   :  +- Calc(select=[id, order_status, customer_id, shop_id, 
>> parent_order_id, order_at, pay_at, channel_id, root_order_id])
>>   : +- TableSourceScan(table=[[default_catalog, default_database, 
>> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
>> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
>> last_updated_at, business_flag, mysql_op_type])
>>   +- Exchange(distribution=[hash[order_id]])
>>  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
>> bom_type, display_qty])
>> +- TableSourceScan(table=[[default_catalog, default_database, 
>> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
>> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
>> display_qty, is_first_flag])
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-06-10 11:02:56,"Shengkai Fang"  写道:
>>>你好,能提供下具体的 plan 供大家查看下吗?
>>>
>>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
>>>").print() 打印下相关的信息。
>>>
>>>Best,
>>>Shengkai
>>>
>>>lxk  于2022年6月10日周五 10:29写道:
>>>
 flink 版本:1.14.4
 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
 join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
 水印是直接使用kafka 自带的时间戳生成watermark


 以下是代码 ---interval j

Re:Re:Re: Flink 使用interval join数据丢失疑问

2022-06-09 文章 Xuyang
Hi, 你的这条SQL 并不是interval join,是普通join。
interval join的使用文档可以参考文档[1]。可以试下使用SQL interval 
join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。




[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins







--

Best!
Xuyang





在 2022-06-10 11:26:33,"lxk"  写道:
>我用的是以下代码:
>String s = streamTableEnvironment.explainSql("select header.customer_id" +
>",item.goods_id" +
>",header.id" +
>",header.order_status" +
>",header.shop_id" +
>",header.parent_order_id" +
>",header.order_at" +
>",header.pay_at" +
>",header.channel_id" +
>",header.root_order_id" +
>",item.id" +
>",item.row_num" +
>",item.p_sp_sub_amt" +
>",item.display_qty" +
>",item.qty" +
>",item.bom_type" +
>" from header JOIN item on header.id = item.order_id");
>
>System.out.println("explain:" + s);
>
>
>
>
>plan信息为:
>explain:== Abstract Syntax Tree ==
>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], 
>shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
>channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], 
>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20])
>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
>   :- LogicalTableScan(table=[[default_catalog, default_database, 
> Unregistered_DataStream_Source_5]])
>   +- LogicalTableScan(table=[[default_catalog, default_database, 
> Unregistered_DataStream_Source_8]])
>
>
>== Optimized Physical Plan ==
>Calc(select=[customer_id, goods_id, id, order_status, shop_id, 
>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, 
>p_sp_sub_amt, display_qty, qty, bom_type])
>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, 
>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, 
>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
>rightInputSpec=[NoUniqueKey])
>   :- Exchange(distribution=[hash[id]])
>   :  +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, 
> order_at, pay_at, channel_id, root_order_id])
>   : +- TableSourceScan(table=[[default_catalog, default_database, 
> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
> last_updated_at, business_flag, mysql_op_type])
>   +- Exchange(distribution=[hash[order_id]])
>  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
> bom_type, display_qty])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
> display_qty, is_first_flag])
>
>
>== Optimized Execution Plan ==
>Calc(select=[customer_id, goods_id, id, order_status, shop_id, 
>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, 
>p_sp_sub_amt, display_qty, qty, bom_type])
>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, 
>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, 
>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
>rightInputSpec=[NoUniqueKey])
>   :- Exchange(distribution=[hash[id]])
>   :  +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, 
> order_at, pay_at, channel_id, root_order_id])
>   : +- TableSourceScan(table=[[default_catalog, default_database, 
> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
> last_updated_at, business_flag, mysql_op_type])
>   +- Exchange(distribution=[hash[order_id]])
>  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
> bom_type, display_qty])
> +- TableSourceScan(table=[[default_catalog, default_database, 
> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
> display_qty, is_first_flag])
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-06-10 11:02:56,"Shengkai Fang"  写道:
>>你好,能提供下具体的 plan 供大家查看下吗?
>>
>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
>>").print() 打印下相关的信息。
>>
>>Best,
>>Shengkai
>>
>>lxk  于2022年6月10日周五 10:29写道:
>>
>>> flink 版本:1.14.4
>>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
>>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
>>> 水印是直接使用kafka 自带的时间戳生成watermark
>>>
>>>
>>> 以下是代码 ---interval join
>>>
>>> SingleOutputStreamOperator headerFullStream =
>>> headerFilterStream.keyBy(data -> data.getId())
>>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
>>> .between(Time.seconds(-10), Time.seconds(20))
>>> .process(new ProcessJoinFunction() {
>>> @Override
>>> public void processElement(OrderHeader left, OrderItem right, Context
>>>

Re:Re: Flink 使用interval join数据丢失疑问

2022-06-09 文章 lxk
我用的是以下代码:
String s = streamTableEnvironment.explainSql("select header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id");

System.out.println("explain:" + s);




plan信息为:
explain:== Abstract Syntax Tree ==
LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], 
shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], 
channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], 
p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20])
+- LogicalJoin(condition=[=($0, $13)], joinType=[inner])
   :- LogicalTableScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]])
   +- LogicalTableScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_8]])


== Optimized Physical Plan ==
Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, 
display_qty, qty, bom_type])
+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, 
order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id])
   : +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
last_updated_at, business_flag, mysql_op_type])
   +- Exchange(distribution=[hash[order_id]])
  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty])
 +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
display_qty, is_first_flag])


== Optimized Execution Plan ==
Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, 
display_qty, qty, bom_type])
+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, 
order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, 
channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty], leftInputSpec=[NoUniqueKey], 
rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[id]])
   :  +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, 
order_at, pay_at, channel_id, root_order_id])
   : +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, 
shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, 
last_updated_at, business_flag, mysql_op_type])
   +- Exchange(distribution=[hash[order_id]])
  +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, 
bom_type, display_qty])
 +- TableSourceScan(table=[[default_catalog, default_database, 
Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, 
s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, 
display_qty, is_first_flag])













在 2022-06-10 11:02:56,"Shengkai Fang"  写道:
>你好,能提供下具体的 plan 供大家查看下吗?
>
>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
>").print() 打印下相关的信息。
>
>Best,
>Shengkai
>
>lxk  于2022年6月10日周五 10:29写道:
>
>> flink 版本:1.14.4
>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
>> 水印是直接使用kafka 自带的时间戳生成watermark
>>
>>
>> 以下是代码 ---interval join
>>
>> SingleOutputStreamOperator headerFullStream =
>> headerFilterStream.keyBy(data -> data.getId())
>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
>> .between(Time.seconds(-10), Time.seconds(20))
>> .process(new ProcessJoinFunction() {
>> @Override
>> public void processElement(OrderHeader left, OrderItem right, Context
>> context, Collector collector) throws Exception {
>> HeaderFull headerFull = new HeaderFull();
>> BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
>> beanUtilsBean.copyProperties(headerFull, left);
>> beanUtilsBean.copyProperties(headerFull, right);
>> String event_date = left.getOrder_at().substring(0, 10);
>> headerFull.setEvent_date(event_date);
>> headerFull.setItem_id(right.getId());
>> collector.collect(headerFull);
>> }
>> }
>> 使用sql 进行join
>> Co

Re: Flink 使用interval join数据丢失疑问

2022-06-09 文章 Shengkai Fang
你好,能提供下具体的 plan 供大家查看下吗?

你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN
").print() 打印下相关的信息。

Best,
Shengkai

lxk  于2022年6月10日周五 10:29写道:

> flink 版本:1.14.4
> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval
> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。
> 水印是直接使用kafka 自带的时间戳生成watermark
>
>
> 以下是代码 ---interval join
>
> SingleOutputStreamOperator headerFullStream =
> headerFilterStream.keyBy(data -> data.getId())
> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
> .between(Time.seconds(-10), Time.seconds(20))
> .process(new ProcessJoinFunction() {
> @Override
> public void processElement(OrderHeader left, OrderItem right, Context
> context, Collector collector) throws Exception {
> HeaderFull headerFull = new HeaderFull();
> BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
> beanUtilsBean.copyProperties(headerFull, left);
> beanUtilsBean.copyProperties(headerFull, right);
> String event_date = left.getOrder_at().substring(0, 10);
> headerFull.setEvent_date(event_date);
> headerFull.setItem_id(right.getId());
> collector.collect(headerFull);
> }
> }
> 使用sql 进行join
> Configuration conf = new Configuration();
> conf.setString("table.exec.mini-batch.enabled","true");
> conf.setString("table.exec.mini-batch.allow-latency","15 s");
> conf.setString("table.exec.mini-batch.size","100");
> conf.setString("table.exec.state.ttl","20 s");
> env.configure(conf);
> Table headerTable =
> streamTableEnvironment.fromDataStream(headerFilterStream);
> Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream);
>
>
> streamTableEnvironment.createTemporaryView("header",headerTable);
> streamTableEnvironment.createTemporaryView("item",itemTable);
>
> Table result = streamTableEnvironment.sqlQuery("select header.customer_id"
> +
> ",item.goods_id" +
> ",header.id" +
> ",header.order_status" +
> ",header.shop_id" +
> ",header.parent_order_id" +
> ",header.order_at" +
> ",header.pay_at" +
> ",header.channel_id" +
> ",header.root_order_id" +
> ",item.id" +
> ",item.row_num" +
> ",item.p_sp_sub_amt" +
> ",item.display_qty" +
> ",item.qty" +
> ",item.bom_type" +
> " from header JOIN item on header.id = item.order_id");
>
>
> DataStream rowDataStream =
> streamTableEnvironment.toChangelogStream(result);
> 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval
> join,为啥两者最终关联上的结果差异这么大。
>
>
>
>
>
>
>
>
>
>
>


Flink 使用interval join数据丢失疑问

2022-06-09 文章 lxk
flink 版本:1.14.4
目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql 
api,直接进行join,数据是正常的,没有丢失。
水印是直接使用kafka 自带的时间戳生成watermark


以下是代码 ---interval join 

SingleOutputStreamOperator headerFullStream = 
headerFilterStream.keyBy(data -> data.getId())
.intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id()))
.between(Time.seconds(-10), Time.seconds(20))
.process(new ProcessJoinFunction() {
@Override
public void processElement(OrderHeader left, OrderItem right, Context context, 
Collector collector) throws Exception {
HeaderFull headerFull = new HeaderFull();
BeanUtilsBean beanUtilsBean = new BeanUtilsBean();
beanUtilsBean.copyProperties(headerFull, left);
beanUtilsBean.copyProperties(headerFull, right);
String event_date = left.getOrder_at().substring(0, 10);
headerFull.setEvent_date(event_date);
headerFull.setItem_id(right.getId());
collector.collect(headerFull);
}
}
使用sql 进行join
Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15 s");
conf.setString("table.exec.mini-batch.size","100");
conf.setString("table.exec.state.ttl","20 s");
env.configure(conf);
Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream);
Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream);


streamTableEnvironment.createTemporaryView("header",headerTable);
streamTableEnvironment.createTemporaryView("item",itemTable);

Table result = streamTableEnvironment.sqlQuery("select header.customer_id" +
",item.goods_id" +
",header.id" +
",header.order_status" +
",header.shop_id" +
",header.parent_order_id" +
",header.order_at" +
",header.pay_at" +
",header.channel_id" +
",header.root_order_id" +
",item.id" +
",item.row_num" +
",item.p_sp_sub_amt" +
",item.display_qty" +
",item.qty" +
",item.bom_type" +
" from header JOIN item on header.id = item.order_id");


DataStream rowDataStream = 
streamTableEnvironment.toChangelogStream(result);
不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval 
join,为啥两者最终关联上的结果差异这么大。












Re:flinkcdc

2022-06-09 文章 Xuyang
Hi?? ??DDL
At 2022-06-09 17:48:01, "1223681919" <1223681...@qq.com.INVALID> wrote:
>
>
>
>[flink-akka.actor.default-dispatcher-6] ERROR 
>org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 
>'flink-akka.actor.default-dispatcher-6' produced an uncaught exception. 
>Stopping the process...
>java.util.concurrent.CompletionException: 
>org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
>coordinators
>   at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>   at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708)
>   at 
> java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
>   at 
> java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
>operator coordinators
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
>   at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
>   ... 27 more
>Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover 
>captured tables for enumerator
>   at 
> com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170)
>   at 
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308)
>   at 
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72)
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094)
>   ... 30 more
>Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, 
>please check your configured database-name: [hand] and table-name: [hand]
>   at 
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
>   at 
> com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161)
>   ... 35 more


Re:(无主题)

2022-06-09 文章 Xuyang



Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org










--

Best!
Xuyang





在 2022-06-09 16:12:11,"zehir.tong"  写道:
>退订


Re:退订

2022-06-09 文章 Xuyang
Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org







--

Best!
Xuyang





在 2022-06-09 13:18:55,"高亮"  写道:
>退订


flinkcdc

2022-06-09 文章 1223681919



[flink-akka.actor.default-dispatcher-6] ERROR 
org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 
'flink-akka.actor.default-dispatcher-6' produced an uncaught exception. 
Stopping the process...
java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Failed to start the operator 
coordinators
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708)
at 
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717)
at 
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010)
at 
org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935)
at 
org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801)
at 
org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the 
operator coordinators
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567)
at 
org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944)
at 
java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705)
... 27 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover 
captured tables for enumerator
at 
com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170)
at 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119)
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308)
at 
org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72)
at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094)
... 30 more
Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, 
please check your configured database-name: [hand] and table-name: [hand]
at 
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167)
at 
com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161)
... 35 more

(无主题)

2022-06-09 文章 zehir.tong
退订

(无主题)

2022-06-09 文章 zehir.tong
退订