Hi,
现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
List<Factory> result = new LinkedList<>();
ServiceLoader
.load(Factory.class, Thread.currentThread().getContextClassLoader())
.iterator()
.forEachRemaining(result::add);
List<Factory> jdbcResult = result.stream().filter(f ->
DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);
> 在 2020年12月3日,19:50,hailongwang <[email protected]> 写道:
>
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB`
> 的 Connector?
>
>
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh" <[email protected]> 写道:
>> 错误:
>>
>>
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple
>> factories for identifier 'jdbc' that implement
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the
>> classpath
>>
>>
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>
>>
>> 代码如下:
>> package org.apache.flink.examples;
>>
>>
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>
>>
>> public class CDC2ss2 {
>> public static void main(String[] args) throws Exception {
>>
>>
>> // set up execution environment
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tEnv;
>>
>>
>> EnvironmentSettings settings =
>> EnvironmentSettings.newInstance()
>> .useBlinkPlanner()
>> .inStreamingMode()
>> .build();
>> tEnv = StreamTableEnvironment.create(env,
>> settings);
>> String src_sql = "CREATE TABLE userss (\n" +
>> "
>> user_id INT,\n" +
>> "
>> user_nm STRING\n" +
>> ") WITH (\n" +
>> "
>> 'connector' = 'mysql-cdc',\n" +
>> "
>> 'hostname' = '10.12.5.37',\n" +
>> "
>> 'port' = '3306',\n" +
>> "
>> 'username' = 'dps',\n" +
>> "
>> 'password' = 'dps1234',\n" +
>> "
>> 'database-name' = 'rpt',\n" +
>> "
>> 'table-name' = 'users'\n" +
>> "
>> )";
>>
>>
>> tEnv.executeSql(src_sql); // 创建表
>>
>>
>> String sink="CREATE TABLE sink (\n" +
>> "
>> user_id INT,\n" +
>> "
>> user_nm STRING,\n" +
>> "
>> primary key(user_id) NOT ENFORCED \n" +
>> ") WITH (\n" +
>> "
>> 'connector' = 'jdbc',\n" +
>> "
>> 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>> "
>> 'username' = 'dps',\n" +
>> "
>> 'password' = 'dps1234',\n" +
>> "
>> 'table-name' = 'sink'\n" +
>> "
>> )";
>> String to_print_sql="insert into sink select
>> user_id ,user_nm from userss";
>> tEnv.executeSql(sink);
>> tEnv.executeSql(to_print_sql);
>> env.execute();
>> }
>>
>>
>> }
>>
>>
>>
>>
>>
>> 详细错误:
>>
>>
>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>> Unable to create a sink for writing table
>> 'default_catalog.default_database.sink'.
>>
>>
>> Table options are:
>>
>>
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>> at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>> at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
>> connector using option ''connector'='jdbc''.
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>> at
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> ... 18 more
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple
>> factories for identifier 'jdbc' that implement
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the
>> classpath.
>>
>>
>> Ambiguous factory classes are:
>>
>>
>> java.util.LinkedList
>> java.util.LinkedList
>> java.util.LinkedList
>> java.util.LinkedList
>> at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
>> at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> ... 19 more
>>
>>
>> Process finished with exit code 1