Hi,
1. 好的,学习了
2. 
确实,部分Flink依赖调整为provided,打包测试也可以正常执行,但是flink-walkthrough-common_2.11这种包在Flink的lib中没有看到,还是打包进去了




在 2020-07-27 11:42:50,"Caizhi Weng" <[email protected]> 写道:
>Hi,
>
>Flink 的 TableFactory 利用了 Java 的服务发现功能,所以需要这两个文件。需要确认 jar-with-dependencies
>是否能把这些资源文件打进去。
>
>另外为什么需要把 Flink 的依赖也打在大包里呢?因为 Flink 本身的 classpath 里就已经有这些依赖了,这个大包作为 Flink
>的用户 jar 的话,并不需要把 Flink 的依赖也放进去。
>
>RS <[email protected]> 于2020年7月24日周五 下午8:30写道:
>
>> hi,
>> 感谢回复,尝试了多次之后,发现应该不是依赖包的问题
>>
>>
>> 我项目中新增目录:resources/META-INF/services
>> 然后从Flink源码中复制了2个文件
>> org.apache.flink.table.factories.TableFactory和org.apache.flink.table.factories.Factory
>> 这样编译就不会报错了,原理不太清楚,但是确实解决了报错的问题。
>>
>>
>> 在 2020-07-24 20:16:18,"JasonLee" <[email protected]> 写道:
>> >hi
>> >只需要-sql和-json两个包就可以了
>> >
>> >
>> >| |
>> >JasonLee
>> >|
>> >|
>> >邮箱:[email protected]
>> >|
>> >
>> >Signature is customized by Netease Mail Master
>> >
>> >On 07/24/2020 17:02, RS wrote:
>> >hi,
>> >Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
>> >编译的jar包是jar-with-dependencies的
>> >
>> >
>> >代码片段:
>> >   public String ddlSql = String.format("CREATE TABLE %s (\n" +
>> >           "  number BIGINT,\n" +
>> >           "  msg STRING,\n" +
>> >           "  username STRING,\n" +
>> >           "  update_time TIMESTAMP(3)\n" +
>> >           ") WITH (\n" +
>> >           " 'connector' = 'kafka',\n" +
>> >           " 'topic' = '%s',\n" +
>> >           " 'properties.bootstrap.servers' = '%s',\n" +
>> >           " 'properties.group.id' = '%s',\n" +
>> >           " 'format' = 'json',\n" +
>> >           " 'json.fail-on-missing-field' = 'false',\n" +
>> >           " 'json.ignore-parse-errors' = 'true'\n" +
>> >           ")\n", tableName, topic, servers, group);
>> >
>> >
>> >       StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >       StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(env);
>> >       tableEnv.executeSql(ddlSql);
>> >
>> >
>> >报错信息:
>> >Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> any factory for identifier 'kafka' that implements
>> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
>> classpath.
>> >Available factory identifiers are:
>> >datagen
>> >at
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
>> >at
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>> >... 33 more
>> >
>> >
>> >参考了这个
>> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
>> >补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>> >
>> >
>> >附上pom依赖:
>> ><dependencies>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-java</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-table-api-java-bridge_2.12</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-table-api-java</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-connector-kafka_2.12</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >       <dependency>
>> >           <groupId>org.apache.flink</groupId>
>> >           <artifactId>flink-json</artifactId>
>> >           <version>${flink.version}</version>
>> >       </dependency>
>> >   </dependencies>
>> >
>> >
>> >感谢各位~
>>

回复