这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:

https://issues.apache.org/jira/browse/FLINK-20186 
<https://issues.apache.org/jira/browse/FLINK-20186>



> 在 2020年12月3日,20:08,Wei Zhong <[email protected]> 写道:
> 
> Hi,
> 
> 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
> 
> List<Factory> result = new LinkedList<>();
> ServiceLoader
>    .load(Factory.class, Thread.currentThread().getContextClassLoader())
>    .iterator()
>    .forEachRemaining(result::add);
> List<Factory> jdbcResult = result.stream().filter(f ->
>    DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
>    f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
> System.out.println(jdbcResult);
> 
> 
>> 在 2020年12月3日,19:50,hailongwang <[email protected] 
>> <mailto:[email protected]>> 写道:
>> 
>> Hi,
>> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
>> 的 Connector?
>> 
>> 
>> Best,
>> Hailong
>> 在 2020-12-03 14:44:18,"xuzh" <[email protected] <mailto:[email protected]>> 写道:
>>> 错误:
>>> 
>>> 
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>>> factories for identifier 'jdbc' that implement 
>>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>>> classpath
>>> 
>>> 
>>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>> 
>>> 
>>> 代码如下:
>>> package org.apache.flink.examples;
>>> 
>>> 
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>> 
>>> 
>>> public class CDC2ss2 {
>>> &nbsp; &nbsp; public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> &nbsp; &nbsp; &nbsp; &nbsp; // set up execution environment
>>> &nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv;
>>> 
>>> 
>>> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance()
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .useBlinkPlanner()
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .inStreamingMode()
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
>>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv = StreamTableEnvironment.create(env, 
>>> settings);
>>> &nbsp; &nbsp; &nbsp; &nbsp; String src_sql = "CREATE TABLE userss (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp;user_id INT,\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp;user_nm STRING\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'connector' = 'mysql-cdc',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'hostname' = '10.12.5.37',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'port' = '3306',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'username' = 'dps',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'password' = 'dps1234',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'database-name' = 'rpt',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'table-name' = 'users'\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; )";
>>> 
>>> 
>>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(src_sql); // 创建表
>>> 
>>> 
>>> &nbsp; &nbsp; &nbsp; &nbsp; String sink="CREATE TABLE sink (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp;user_id INT,\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp;user_nm STRING,\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp;primary key(user_id)&nbsp; NOT ENFORCED \n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'connector' = 'jdbc',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n 
>>> <mysql://10.0.171.171:3306/dps?useSSL=false',\n>" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'username' = 'dps',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'password' = 'dps1234',\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; 'table-name' = 'sink'\n" +
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>>> &nbsp; )";
>>> &nbsp; &nbsp; &nbsp; &nbsp; String to_print_sql="insert into sink select 
>>> user_id&nbsp; ,user_nm&nbsp; &nbsp;from userss";
>>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tEnv.executeSql(sink);
>>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(to_print_sql);
>>> &nbsp; &nbsp; &nbsp; &nbsp; env.execute();
>>> &nbsp; &nbsp; }
>>> 
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 详细错误:
>>> 
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> Unable to create a sink for writing table 
>>> 'default_catalog.default_database.sink'.
>>> 
>>> 
>>> Table options are:
>>> 
>>> 
>>> 'connector'='jdbc'
>>> 'password'='dps1234'
>>> 'table-name'='sink'
>>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' 
>>> <mysql://10.0.171.171:3306/dps?useSSL=false'>
>>> 'username'='dps'
>>>     at 
>>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>>     at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>>     at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>>     at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>>     at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>>     at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>     at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>     at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>     at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>     at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>     at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>>     at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>>>     at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>>>     at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>>>     at 
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>>>     at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover 
>>> a connector using option ''connector'='jdbc''.
>>>     at 
>>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>>>     at 
>>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>>>     ... 18 more
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>>> factories for identifier 'jdbc' that implement 
>>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>>> classpath.
>>> 
>>> 
>>> Ambiguous factory classes are:
>>> 
>>> 
>>> java.util.LinkedList
>>> java.util.LinkedList
>>> java.util.LinkedList
>>> java.util.LinkedList
>>>     at 
>>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
>>>     at 
>>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>>>     ... 19 more
>>> 
>>> 
>>> Process finished with exit code 1
> 

回复