这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:
https://issues.apache.org/jira/browse/FLINK-20186 <https://issues.apache.org/jira/browse/FLINK-20186> > 在 2020年12月3日,20:08,Wei Zhong <[email protected]> 写道: > > Hi, > > 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了: > > List<Factory> result = new LinkedList<>(); > ServiceLoader > .load(Factory.class, Thread.currentThread().getContextClassLoader()) > .iterator() > .forEachRemaining(result::add); > List<Factory> jdbcResult = result.stream().filter(f -> > DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter( > f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList()); > System.out.println(jdbcResult); > > >> 在 2020年12月3日,19:50,hailongwang <[email protected] >> <mailto:[email protected]>> 写道: >> >> Hi, >> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` >> 的 Connector? >> >> >> Best, >> Hailong >> 在 2020-12-03 14:44:18,"xuzh" <[email protected] <mailto:[email protected]>> 写道: >>> 错误: >>> >>> >>> Caused by: org.apache.flink.table.api.ValidationException: Multiple >>> factories for identifier 'jdbc' that implement >>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the >>> classpath >>> >>> >>> 看意思是找到了两个一样的类:DynamicTableSinkFactory >>> >>> >>> 代码如下: >>> package org.apache.flink.examples; >>> >>> >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.table.api.EnvironmentSettings; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.factories.DynamicTableSinkFactory; >>> >>> >>> public class CDC2ss2 { >>> public static void main(String[] args) throws Exception { >>> >>> >>> // set up execution environment >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> StreamTableEnvironment tEnv; >>> >>> >>> EnvironmentSettings settings = >>> EnvironmentSettings.newInstance() >>> .useBlinkPlanner() >>> .inStreamingMode() >>> .build(); >>> tEnv = StreamTableEnvironment.create(env, >>> settings); >>> String src_sql = "CREATE TABLE userss (\n" + >>> " >>> user_id INT,\n" + >>> " >>> user_nm STRING\n" + >>> ") WITH (\n" + >>> " >>> 'connector' = 'mysql-cdc',\n" + >>> " >>> 'hostname' = '10.12.5.37',\n" + >>> " >>> 'port' = '3306',\n" + >>> " >>> 'username' = 'dps',\n" + >>> " >>> 'password' = 'dps1234',\n" + >>> " >>> 'database-name' = 'rpt',\n" + >>> " >>> 'table-name' = 'users'\n" + >>> " >>> )"; >>> >>> >>> tEnv.executeSql(src_sql); // 创建表 >>> >>> >>> String sink="CREATE TABLE sink (\n" + >>> " >>> user_id INT,\n" + >>> " >>> user_nm STRING,\n" + >>> " >>> primary key(user_id) NOT ENFORCED \n" + >>> ") WITH (\n" + >>> " >>> 'connector' = 'jdbc',\n" + >>> " >>> 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n >>> <mysql://10.0.171.171:3306/dps?useSSL=false',\n>" + >>> " >>> 'username' = 'dps',\n" + >>> " >>> 'password' = 'dps1234',\n" + >>> " >>> 'table-name' = 'sink'\n" + >>> " >>> )"; >>> String to_print_sql="insert into sink select >>> user_id ,user_nm from userss"; >>> tEnv.executeSql(sink); >>> tEnv.executeSql(to_print_sql); >>> env.execute(); >>> } >>> >>> >>> } >>> >>> >>> >>> >>> >>> 详细错误: >>> >>> >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> Unable to create a sink for writing table >>> 'default_catalog.default_database.sink'. >>> >>> >>> Table options are: >>> >>> >>> 'connector'='jdbc' >>> 'password'='dps1234' >>> 'table-name'='sink' >>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' >>> <mysql://10.0.171.171:3306/dps?useSSL=false'> >>> 'username'='dps' >>> at >>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>> at >>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) >>> at scala.collection.Iterator$class.foreach(Iterator.scala:893) >>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54) >>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) >>> at scala.collection.AbstractTraversable.map(Traversable.scala:104) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) >>> at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50) >>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover >>> a connector using option ''connector'='jdbc''. >>> at >>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) >>> at >>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) >>> ... 18 more >>> Caused by: org.apache.flink.table.api.ValidationException: Multiple >>> factories for identifier 'jdbc' that implement >>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the >>> classpath. >>> >>> >>> Ambiguous factory classes are: >>> >>> >>> java.util.LinkedList >>> java.util.LinkedList >>> java.util.LinkedList >>> java.util.LinkedList >>> at >>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253) >>> at >>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) >>> ... 19 more >>> >>> >>> Process finished with exit code 1 >
