Hi, 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 Connector?
Best, Hailong 在 2020-12-03 14:44:18,"xuzh" <[email protected]> 写道: >错误: > > >Caused by: org.apache.flink.table.api.ValidationException: Multiple factories >for identifier 'jdbc' that implement >'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the >classpath > > >看意思是找到了两个一样的类:DynamicTableSinkFactory > > >代码如下: >package org.apache.flink.examples; > > >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.table.api.EnvironmentSettings; >import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >import org.apache.flink.table.factories.DynamicTableSinkFactory; > > >public class CDC2ss2 { > public static void main(String[] args) throws Exception { > > > // set up execution environment > StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tEnv; > > > EnvironmentSettings settings = >EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > tEnv = StreamTableEnvironment.create(env, >settings); > String src_sql = "CREATE TABLE userss (\n" + > " > user_id INT,\n" + > " > user_nm STRING\n" + > ") WITH (\n" + > " >'connector' = 'mysql-cdc',\n" + > " >'hostname' = '10.12.5.37',\n" + > " >'port' = '3306',\n" + > " >'username' = 'dps',\n" + > " >'password' = 'dps1234',\n" + > " >'database-name' = 'rpt',\n" + > " >'table-name' = 'users'\n" + > " >)"; > > > tEnv.executeSql(src_sql); // 创建表 > > > String sink="CREATE TABLE sink (\n" + > " > user_id INT,\n" + > " > user_nm STRING,\n" + > " > primary key(user_id) NOT ENFORCED \n" + > ") WITH (\n" + > " >'connector' = 'jdbc',\n" + > " >'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" + > " >'username' = 'dps',\n" + > " >'password' = 'dps1234',\n" + > " >'table-name' = 'sink'\n" + > " >)"; > String to_print_sql="insert into sink select >user_id ,user_nm from userss"; > tEnv.executeSql(sink); > tEnv.executeSql(to_print_sql); > env.execute(); > } > > >} > > > > > >详细错误: > > >Exception in thread "main" org.apache.flink.table.api.ValidationException: >Unable to create a sink for writing table >'default_catalog.default_database.sink'. > > >Table options are: > > >'connector'='jdbc' >'password'='dps1234' >'table-name'='sink' >'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' >'username'='dps' > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690) > at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50) >Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a >connector using option ''connector'='jdbc''. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 18 more >Caused by: org.apache.flink.table.api.ValidationException: Multiple factories >for identifier 'jdbc' that implement >'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the >classpath. > > >Ambiguous factory classes are: > > >java.util.LinkedList >java.util.LinkedList >java.util.LinkedList >java.util.LinkedList > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ... 19 more > > >Process finished with exit code 1
