可能跟你的打包方式有关系。你这个程序如果直接在idea里面运行是可以运行的么?
如果可以在idea运行,但是打出来的jar包不能提交运行的话,很有可能跟SPI文件有关系。 如果你用的是shade plugin,需要看下这个transformer[1] [1] https://maven.apache.org/plugins/maven-shade-plugin/examples/resource-transformers.html#AppendingTransformer RS <[email protected]> 于2020年7月24日周五 下午5:02写道: > hi, > Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 > 编译的jar包是jar-with-dependencies的 > > > 代码片段: > public String ddlSql = String.format("CREATE TABLE %s (\n" + > " number BIGINT,\n" + > " msg STRING,\n" + > " username STRING,\n" + > " update_time TIMESTAMP(3)\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = '%s',\n" + > " 'properties.bootstrap.servers' = '%s',\n" + > " 'properties.group.id' = '%s',\n" + > " 'format' = 'json',\n" + > " 'json.fail-on-missing-field' = 'false',\n" + > " 'json.ignore-parse-errors' = 'true'\n" + > ")\n", tableName, topic, servers, group); > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(env); > tableEnv.executeSql(ddlSql); > > > 报错信息: > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the > classpath. > Available factory identifiers are: > datagen > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ... 33 more > > > 参考了这个 > http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 > 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 > > > 附上pom依赖: > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java-bridge_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-table-api-java</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-sql-connector-kafka_2.12</artifactId> > <version>${flink.version}</version> > </dependency> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-json</artifactId> > <version>${flink.version}</version> > </dependency> > </dependencies> > > > 感谢各位~ -- Best, Benchao Li
