??????
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories
for identifier 'jdbc' that implement
'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the
classpath
????????????????????????????DynamicTableSinkFactory
??????????
package org.apache.flink.examples;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
public class CDC2ss2 {
public static void main(String[] args) throws Exception {
// set up execution environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv;
EnvironmentSettings settings =
EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
tEnv = StreamTableEnvironment.create(env, settings);
String src_sql = "CREATE TABLE userss (\n" +
"
user_id INT,\n" +
"
user_nm STRING\n" +
") WITH (\n" +
"
'connector' = 'mysql-cdc',\n" +
"
'hostname' = '10.12.5.37',\n" +
"
'port' = '3306',\n" +
"
'username' = 'dps',\n" +
"
'password' = 'dps1234',\n" +
"
'database-name' = 'rpt',\n" +
"
'table-name' = 'users'\n" +
"
)";
tEnv.executeSql(src_sql); // ??????
String sink="CREATE TABLE sink (\n" +
"
user_id INT,\n" +
"
user_nm STRING,\n" +
"
primary key(user_id) NOT ENFORCED \n" +
") WITH (\n" +
"
'connector' = 'jdbc',\n" +
"
'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
"
'username' = 'dps',\n" +
"
'password' = 'dps1234',\n" +
"
'table-name' = 'sink'\n" +
"
)";
String to_print_sql="insert into sink select
user_id ,user_nm from userss";
tEnv.executeSql(sink);
tEnv.executeSql(to_print_sql);
env.execute();
}
}
??????????
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Unable to create a sink for writing table
'default_catalog.default_database.sink'.
Table options are:
'connector'='jdbc'
'password'='dps1234'
'table-name'='sink'
'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
'username'='dps'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='jdbc''.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
Caused by: org.apache.flink.table.api.ValidationException: Multiple factories
for identifier 'jdbc' that implement
'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the
classpath.
Ambiguous factory classes are:
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
java.util.LinkedList
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 19 more
Process finished with exit code 1