??????

Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
for identifier 'jdbc' that implement 
'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
classpath


????????????????????????????DynamicTableSinkFactory


??????????
package org.apache.flink.examples;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.DynamicTableSinkFactory;


public class CDC2ss2 {
    public static void main(String[] args) throws Exception {


        // set up execution environment
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv;


        EnvironmentSettings settings = 
EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        tEnv = StreamTableEnvironment.create(env, settings);
        String src_sql = "CREATE TABLE userss (\n" +
                "    
 user_id INT,\n" +
                "    
 user_nm STRING\n" +
                ") WITH (\n" +
                "      
'connector' = 'mysql-cdc',\n" +
                "      
'hostname' = '10.12.5.37',\n" +
                "      
'port' = '3306',\n" +
                "      
'username' = 'dps',\n" +
                "      
'password' = 'dps1234',\n" +
                "      
'database-name' = 'rpt',\n" +
                "      
'table-name' = 'users'\n" +
                "      
)";


        tEnv.executeSql(src_sql); // ??????


        String sink="CREATE TABLE sink (\n" +
                "    
 user_id INT,\n" +
                "    
 user_nm STRING,\n" +
                "    
 primary key(user_id)  NOT ENFORCED \n" +
                ") WITH (\n" +
                "      
'connector' = 'jdbc',\n" +
                "      
'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
                "      
'username' = 'dps',\n" +
                "      
'password' = 'dps1234',\n" +
                "      
'table-name' = 'sink'\n" +
                "      
)";
        String to_print_sql="insert into sink select 
user_id  ,user_nm   from userss";
         tEnv.executeSql(sink);
        tEnv.executeSql(to_print_sql);
        env.execute();
    }


}





??????????


Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 
'default_catalog.default_database.sink'.


Table options are:


'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
        at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
connector using option ''connector'='jdbc''.
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
        ... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
for identifier 'jdbc' that implement 
'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
classpath.


Ambiguous factory classes are:


java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
        at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
        ... 19 more


Process finished with exit code 1

回复