可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?

如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。
如果你用的是shade plugin,需要看下这个transformer[1]

[1]
https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer

RS <[email protected]> 于2020年7月24日周五 下午5:02写道:

> hi,
> Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
> 编译的jar包是jar-with-dependencies的
>
>
> 代码片段:
>     public String ddlSql = String.format("CREATE TABLE %s (\n" +
>             "  number BIGINT,\n" +
>             "  msg STRING,\n" +
>             "  username STRING,\n" +
>             "  update_time TIMESTAMP(3)\n" +
>             ") WITH (\n" +
>             " 'connector' = 'kafka',\n" +
>             " 'topic' = '%s',\n" +
>             " 'properties.bootstrap.servers' = '%s',\n" +
>             " 'properties.group.id' = '%s',\n" +
>             " 'format' = 'json',\n" +
>             " 'json.fail-on-missing-field' = 'false',\n" +
>             " 'json.ignore-parse-errors' = 'true'\n" +
>             ")\n", tableName, topic, servers, group);
>
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>         tableEnv.executeSql(ddlSql);
>
>
> 报错信息:
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'kafka' that implements
> 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
> classpath.
> Available factory identifiers are:
> datagen
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> ... 33 more
>
>
> 参考了这个
> http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
> 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
>
>
> 附上pom依赖:
> <dependencies>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java-bridge_2.12</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-table-api-java</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka_2.12</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-sql-connector-kafka_2.12</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>         <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-json</artifactId>
>             <version>${flink.version}</version>
>         </dependency>
>     </dependencies>
>
>
> 感谢各位~



-- 

Best,
Benchao Li

回复