Re:Re:Re:Re: Flink 使用interval join数据丢失疑问
我不理解的点在于,我interval join开的时间窗口比我sql中设置的状态时间都要长,窗口的上下界别是-10s 和 20s,为什么会丢数据? sql中我设置这个table.exec.state.ttl参数 为20s,照理来说两个流应该也是保留20s的数据在状态中进行join。不知道我的理解是否有问题,希望能够得到解答。 在 2022-06-10 14:15:29,"Xuyang" 写道: >Hi, 你的这条SQL 并不是interval join,是普通join。 >interval join的使用文档可以参考文档[1]。可以试下使用SQL interval >join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 > > > > >[1] >https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins > > > > > > > >-- > >Best! >Xuyang > > > > > >在 2022-06-10 11:26:33,"lxk" 写道: >>我用的是以下代码: >>String s = streamTableEnvironment.explainSql("select header.customer_id" + >>",item.goods_id" + >>",header.id" + >>",header.order_status" + >>",header.shop_id" + >>",header.parent_order_id" + >>",header.order_at" + >>",header.pay_at" + >>",header.channel_id" + >>",header.root_order_id" + >>",item.id" + >>",item.row_num" + >>",item.p_sp_sub_amt" + >>",item.display_qty" + >>",item.qty" + >>",item.bom_type" + >>" from header JOIN item on header.id = item.order_id"); >> >>System.out.println("explain:" + s); >> >> >> >> >>plan信息为: >>explain:== Abstract Syntax Tree == >>LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], >>shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], >>channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], >>p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) >>+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) >> :- LogicalTableScan(table=[[default_catalog, default_database, >> Unregistered_DataStream_Source_5]]) >> +- LogicalTableScan(table=[[default_catalog, default_database, >> Unregistered_DataStream_Source_8]]) >> >> >>== Optimized Physical Plan == >>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >>p_sp_sub_amt, display_qty, qty, bom_type]) >>+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, >>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>rightInputSpec=[NoUniqueKey]) >> :- Exchange(distribution=[hash[id]]) >> : +- Calc(select=[id, order_status, customer_id, shop_id, >> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >> : +- TableSourceScan(table=[[default_catalog, default_database, >> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, >> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, >> last_updated_at, business_flag, mysql_op_type]) >> +- Exchange(distribution=[hash[order_id]]) >> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >> bom_type, display_qty]) >> +- TableSourceScan(table=[[default_catalog, default_database, >> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, >> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, >> display_qty, is_first_flag]) >> >> >>== Optimized Execution Plan == >>Calc(select=[customer_id, goods_id, id, order_status, shop_id, >>parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >>p_sp_sub_amt, display_qty, qty, bom_type]) >>+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, >>order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >>channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >>p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >>rightInputSpec=[NoUniqueKey]) >> :- Exchange(distribution=[hash[id]]) >> : +- Calc(select=[id, order_status, customer_id, shop_id, >> parent_order_id, order_at, pay_at, channel_id, root_order_id]) >> : +- TableSourceScan(table=[[default_catalog, default_database, >> Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, >> shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, >> last_updated_at, business_flag, mysql_op_type]) >> +- Exchange(distribution=[hash[order_id]]) >> +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, >> bom_type, display_qty]) >> +- TableSourceScan(table=[[default_catalog, default_database, >> Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, >> s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, >> display_qty, is_first_flag]) >> >> >> >> >> >> >> >> >> >> >> >> >> >>在 2022-06-10 11:02:56,"Shengkai Fang" 写道: >>>你好,能提供下具体的 plan 供大家查看下吗? >>> >>>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >>>").print() 打印下相关的信息。 >>> >>>Best, >>>Shengkai >>> >>>lxk 于2022年6月10日周五 10:29写道: >>> flink 版本:1.14.4 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 水印是直接使用kafka 自带的时间戳生成watermark 以下是代码 ---interval j
Re:Re:Re: Flink 使用interval join数据丢失疑问
Hi, 你的这条SQL 并不是interval join,是普通join。 interval join的使用文档可以参考文档[1]。可以试下使用SQL interval join会不会丢数据(注意设置state的ttl),从而判断是数据的问题还是datastream api的问题。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#interval-joins -- Best! Xuyang 在 2022-06-10 11:26:33,"lxk" 写道: >我用的是以下代码: >String s = streamTableEnvironment.explainSql("select header.customer_id" + >",item.goods_id" + >",header.id" + >",header.order_status" + >",header.shop_id" + >",header.parent_order_id" + >",header.order_at" + >",header.pay_at" + >",header.channel_id" + >",header.root_order_id" + >",item.id" + >",item.row_num" + >",item.p_sp_sub_amt" + >",item.display_qty" + >",item.qty" + >",item.bom_type" + >" from header JOIN item on header.id = item.order_id"); > >System.out.println("explain:" + s); > > > > >plan信息为: >explain:== Abstract Syntax Tree == >LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], >shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], >channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], >p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) >+- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) > :- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]]) > +- LogicalTableScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]]) > > >== Optimized Physical Plan == >Calc(select=[customer_id, goods_id, id, order_status, shop_id, >parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >p_sp_sub_amt, display_qty, qty, bom_type]) >+- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, >order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[id]]) > : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, > order_at, pay_at, channel_id, root_order_id]) > : +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, > shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, > last_updated_at, business_flag, mysql_op_type]) > +- Exchange(distribution=[hash[order_id]]) > +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, > bom_type, display_qty]) > +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, > s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, > display_qty, is_first_flag]) > > >== Optimized Execution Plan == >Calc(select=[customer_id, goods_id, id, order_status, shop_id, >parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, >p_sp_sub_amt, display_qty, qty, bom_type]) >+- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, >order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, >channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, >p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], >rightInputSpec=[NoUniqueKey]) > :- Exchange(distribution=[hash[id]]) > : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, > order_at, pay_at, channel_id, root_order_id]) > : +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, > shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, > last_updated_at, business_flag, mysql_op_type]) > +- Exchange(distribution=[hash[order_id]]) > +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, > bom_type, display_qty]) > +- TableSourceScan(table=[[default_catalog, default_database, > Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, > s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, > display_qty, is_first_flag]) > > > > > > > > > > > > > >在 2022-06-10 11:02:56,"Shengkai Fang" 写道: >>你好,能提供下具体的 plan 供大家查看下吗? >> >>你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >>").print() 打印下相关的信息。 >> >>Best, >>Shengkai >> >>lxk 于2022年6月10日周五 10:29写道: >> >>> flink 版本:1.14.4 >>> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval >>> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 >>> 水印是直接使用kafka 自带的时间戳生成watermark >>> >>> >>> 以下是代码 ---interval join >>> >>> SingleOutputStreamOperator headerFullStream = >>> headerFilterStream.keyBy(data -> data.getId()) >>> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) >>> .between(Time.seconds(-10), Time.seconds(20)) >>> .process(new ProcessJoinFunction() { >>> @Override >>> public void processElement(OrderHeader left, OrderItem right, Context >>>
Re:Re: Flink 使用interval join数据丢失疑问
我用的是以下代码: String s = streamTableEnvironment.explainSql("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id"); System.out.println("explain:" + s); plan信息为: explain:== Abstract Syntax Tree == LogicalProject(customer_id=[$2], goods_id=[$15], id=[$0], order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$12], row_num=[$14], p_sp_sub_amt=[$19], display_qty=[$22], qty=[$17], bom_type=[$20]) +- LogicalJoin(condition=[=($0, $13)], joinType=[inner]) :- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]]) +- LogicalTableScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]]) == Optimized Physical Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- Join(joinType=[InnerJoin], where=[=(id, order_id)], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) == Optimized Execution Plan == Calc(select=[customer_id, goods_id, id, order_status, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, row_num, p_sp_sub_amt, display_qty, qty, bom_type]) +- Join(joinType=[InnerJoin], where=[(id = order_id)], select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, id0, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) :- Exchange(distribution=[hash[id]]) : +- Calc(select=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id]) : +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_5]], fields=[id, order_status, customer_id, shop_id, parent_order_id, order_at, pay_at, channel_id, root_order_id, last_updated_at, business_flag, mysql_op_type]) +- Exchange(distribution=[hash[order_id]]) +- Calc(select=[id, order_id, row_num, goods_id, qty, p_sp_sub_amt, bom_type, display_qty]) +- TableSourceScan(table=[[default_catalog, default_database, Unregistered_DataStream_Source_8]], fields=[id, order_id, row_num, goods_id, s_sku_code, qty, p_paid_sub_amt, p_sp_sub_amt, bom_type, last_updated_at, display_qty, is_first_flag]) 在 2022-06-10 11:02:56,"Shengkai Fang" 写道: >你好,能提供下具体的 plan 供大家查看下吗? > >你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN >").print() 打印下相关的信息。 > >Best, >Shengkai > >lxk 于2022年6月10日周五 10:29写道: > >> flink 版本:1.14.4 >> 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval >> join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 >> 水印是直接使用kafka 自带的时间戳生成watermark >> >> >> 以下是代码 ---interval join >> >> SingleOutputStreamOperator headerFullStream = >> headerFilterStream.keyBy(data -> data.getId()) >> .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) >> .between(Time.seconds(-10), Time.seconds(20)) >> .process(new ProcessJoinFunction() { >> @Override >> public void processElement(OrderHeader left, OrderItem right, Context >> context, Collector collector) throws Exception { >> HeaderFull headerFull = new HeaderFull(); >> BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); >> beanUtilsBean.copyProperties(headerFull, left); >> beanUtilsBean.copyProperties(headerFull, right); >> String event_date = left.getOrder_at().substring(0, 10); >> headerFull.setEvent_date(event_date); >> headerFull.setItem_id(right.getId()); >> collector.collect(headerFull); >> } >> } >> 使用sql 进行join >> Co
Re: Flink 使用interval join数据丢失疑问
你好,能提供下具体的 plan 供大家查看下吗? 你可以直接 使用 tEnv.executeSql("Explain JSON_EXECUTION_PLAN ").print() 打印下相关的信息。 Best, Shengkai lxk 于2022年6月10日周五 10:29写道: > flink 版本:1.14.4 > 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval > join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 > 水印是直接使用kafka 自带的时间戳生成watermark > > > 以下是代码 ---interval join > > SingleOutputStreamOperator headerFullStream = > headerFilterStream.keyBy(data -> data.getId()) > .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) > .between(Time.seconds(-10), Time.seconds(20)) > .process(new ProcessJoinFunction() { > @Override > public void processElement(OrderHeader left, OrderItem right, Context > context, Collector collector) throws Exception { > HeaderFull headerFull = new HeaderFull(); > BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); > beanUtilsBean.copyProperties(headerFull, left); > beanUtilsBean.copyProperties(headerFull, right); > String event_date = left.getOrder_at().substring(0, 10); > headerFull.setEvent_date(event_date); > headerFull.setItem_id(right.getId()); > collector.collect(headerFull); > } > } > 使用sql 进行join > Configuration conf = new Configuration(); > conf.setString("table.exec.mini-batch.enabled","true"); > conf.setString("table.exec.mini-batch.allow-latency","15 s"); > conf.setString("table.exec.mini-batch.size","100"); > conf.setString("table.exec.state.ttl","20 s"); > env.configure(conf); > Table headerTable = > streamTableEnvironment.fromDataStream(headerFilterStream); > Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream); > > > streamTableEnvironment.createTemporaryView("header",headerTable); > streamTableEnvironment.createTemporaryView("item",itemTable); > > Table result = streamTableEnvironment.sqlQuery("select header.customer_id" > + > ",item.goods_id" + > ",header.id" + > ",header.order_status" + > ",header.shop_id" + > ",header.parent_order_id" + > ",header.order_at" + > ",header.pay_at" + > ",header.channel_id" + > ",header.root_order_id" + > ",item.id" + > ",item.row_num" + > ",item.p_sp_sub_amt" + > ",item.display_qty" + > ",item.qty" + > ",item.bom_type" + > " from header JOIN item on header.id = item.order_id"); > > > DataStream rowDataStream = > streamTableEnvironment.toChangelogStream(result); > 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval > join,为啥两者最终关联上的结果差异这么大。 > > > > > > > > > > >
Flink 使用interval join数据丢失疑问
flink 版本:1.14.4 目前在使用flink interval join进行数据关联,在测试的时候发现一个问题,就是使用interval join完之后数据会丢失,但是使用sql api,直接进行join,数据是正常的,没有丢失。 水印是直接使用kafka 自带的时间戳生成watermark 以下是代码 ---interval join SingleOutputStreamOperator headerFullStream = headerFilterStream.keyBy(data -> data.getId()) .intervalJoin(filterItemStream.keyBy(data -> data.getOrder_id())) .between(Time.seconds(-10), Time.seconds(20)) .process(new ProcessJoinFunction() { @Override public void processElement(OrderHeader left, OrderItem right, Context context, Collector collector) throws Exception { HeaderFull headerFull = new HeaderFull(); BeanUtilsBean beanUtilsBean = new BeanUtilsBean(); beanUtilsBean.copyProperties(headerFull, left); beanUtilsBean.copyProperties(headerFull, right); String event_date = left.getOrder_at().substring(0, 10); headerFull.setEvent_date(event_date); headerFull.setItem_id(right.getId()); collector.collect(headerFull); } } 使用sql 进行join Configuration conf = new Configuration(); conf.setString("table.exec.mini-batch.enabled","true"); conf.setString("table.exec.mini-batch.allow-latency","15 s"); conf.setString("table.exec.mini-batch.size","100"); conf.setString("table.exec.state.ttl","20 s"); env.configure(conf); Table headerTable = streamTableEnvironment.fromDataStream(headerFilterStream); Table itemTable = streamTableEnvironment.fromDataStream(filterItemStream); streamTableEnvironment.createTemporaryView("header",headerTable); streamTableEnvironment.createTemporaryView("item",itemTable); Table result = streamTableEnvironment.sqlQuery("select header.customer_id" + ",item.goods_id" + ",header.id" + ",header.order_status" + ",header.shop_id" + ",header.parent_order_id" + ",header.order_at" + ",header.pay_at" + ",header.channel_id" + ",header.root_order_id" + ",item.id" + ",item.row_num" + ",item.p_sp_sub_amt" + ",item.display_qty" + ",item.qty" + ",item.bom_type" + " from header JOIN item on header.id = item.order_id"); DataStream rowDataStream = streamTableEnvironment.toChangelogStream(result); 不太理解为什么使用interval join会丢这么多数据,按照我的理解使用sql join,底层应该也是用的类似interval join,为啥两者最终关联上的结果差异这么大。
Re:flinkcdc
Hi?? ??DDL At 2022-06-09 17:48:01, "1223681919" <1223681...@qq.com.INVALID> wrote: > > > >[flink-akka.actor.default-dispatcher-6] ERROR >org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread >'flink-akka.actor.default-dispatcher-6' produced an uncaught exception. >Stopping the process... >java.util.concurrent.CompletionException: >org.apache.flink.util.FlinkRuntimeException: Failed to start the operator >coordinators > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708) > at > java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) > at > java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) > at > org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935) > at > org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801) > at > org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the >operator coordinators > at > org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100) > at > org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567) > at > org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944) > at > java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) > ... 27 more >Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover >captured tables for enumerator > at > com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) > at > org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308) > at > org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182) > at > org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094) > ... 30 more >Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, >please check your configured database-name: [hand] and table-name: [hand] > at > com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) > at > com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) > ... 35 more
Re:(无主题)
Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org -- Best! Xuyang 在 2022-06-09 16:12:11,"zehir.tong" 写道: >退订
Re:退订
Hi, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org -- Best! Xuyang 在 2022-06-09 13:18:55,"高亮" 写道: >退订
flinkcdc
[flink-akka.actor.default-dispatcher-6] ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler - FATAL: Thread 'flink-akka.actor.default-dispatcher-6' produced an uncaught exception. Stopping the process... java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:708) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) at org.apache.flink.runtime.jobmaster.JobMaster.resetAndStartScheduler(JobMaster.java:935) at org.apache.flink.runtime.jobmaster.JobMaster.startJobExecution(JobMaster.java:801) at org.apache.flink.runtime.jobmaster.JobMaster.lambda$start$1(JobMaster.java:357) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleCallAsync(AkkaRpcActor.java:383) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:88) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to start the operator coordinators at org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1100) at org.apache.flink.runtime.scheduler.SchedulerBase.startScheduling(SchedulerBase.java:567) at org.apache.flink.runtime.jobmaster.JobMaster.startScheduling(JobMaster.java:944) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) ... 27 more Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to discover captured tables for enumerator at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:170) at org.apache.flink.runtime.source.coordinator.SourceCoordinator.start(SourceCoordinator.java:119) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.applyCall(RecreateOnResetOperatorCoordinator.java:308) at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.start(RecreateOnResetOperatorCoordinator.java:72) at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.start(OperatorCoordinatorHolder.java:182) at org.apache.flink.runtime.scheduler.SchedulerBase.startAllOperatorCoordinators(SchedulerBase.java:1094) ... 30 more Caused by: java.lang.IllegalArgumentException: Can't find any matched tables, please check your configured database-name: [hand] and table-name: [hand] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.discoverCapturedTables(DebeziumUtils.java:167) at com.ververica.cdc.connectors.mysql.source.MySqlSource.createEnumerator(MySqlSource.java:161) ... 35 more
(无主题)
退订
(无主题)
退订