Hi,
 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 
Connector?


Best,
Hailong
在 2020-12-03 14:44:18,"xuzh" <[email protected]> 写道:
>错误:
>
>
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
>for identifier 'jdbc' that implement 
>'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>classpath
>
>
>看意思是找到了两个一样的类:DynamicTableSinkFactory
>
>
>代码如下:
>package org.apache.flink.examples;
>
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.factories.DynamicTableSinkFactory;
>
>
>public class CDC2ss2 {
>&nbsp; &nbsp; public static void main(String[] args) throws Exception {
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; // set up execution environment
>&nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>&nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv;
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings settings = 
>EnvironmentSettings.newInstance()
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .useBlinkPlanner()
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .inStreamingMode()
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
>&nbsp; &nbsp; &nbsp; &nbsp; tEnv = StreamTableEnvironment.create(env, 
>settings);
>&nbsp; &nbsp; &nbsp; &nbsp; String src_sql = "CREATE TABLE userss (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>&nbsp;user_id INT,\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>&nbsp;user_nm STRING\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'connector' = 'mysql-cdc',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'hostname' = '10.12.5.37',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'port' = '3306',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'username' = 'dps',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'password' = 'dps1234',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'database-name' = 'rpt',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'table-name' = 'users'\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>)";
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(src_sql); // 创建表
>
>
>&nbsp; &nbsp; &nbsp; &nbsp; String sink="CREATE TABLE sink (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>&nbsp;user_id INT,\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>&nbsp;user_nm STRING,\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>&nbsp;primary key(user_id)&nbsp; NOT ENFORCED \n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'connector' = 'jdbc',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'username' = 'dps',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'password' = 'dps1234',\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>'table-name' = 'sink'\n" +
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; &nbsp; 
>)";
>&nbsp; &nbsp; &nbsp; &nbsp; String to_print_sql="insert into sink select 
>user_id&nbsp; ,user_nm&nbsp; &nbsp;from userss";
>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tEnv.executeSql(sink);
>&nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(to_print_sql);
>&nbsp; &nbsp; &nbsp; &nbsp; env.execute();
>&nbsp; &nbsp; }
>
>
>}
>
>
>
>
>
>详细错误:
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Unable to create a sink for writing table 
>'default_catalog.default_database.sink'.
>
>
>Table options are:
>
>
>'connector'='jdbc'
>'password'='dps1234'
>'table-name'='sink'
>'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>'username'='dps'
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>       at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>       at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
>connector using option ''connector'='jdbc''.
>       at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>       at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>       ... 18 more
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
>for identifier 'jdbc' that implement 
>'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>classpath.
>
>
>Ambiguous factory classes are:
>
>
>java.util.LinkedList
>java.util.LinkedList
>java.util.LinkedList
>java.util.LinkedList
>       at 
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
>       at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>       ... 19 more
>
>
>Process finished with exit code 1

回复