hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的
代码片段:
public String ddlSql = String.format("CREATE TABLE %s (\n" +
" number BIGINT,\n" +
" msg STRING,\n" +
" username STRING,\n" +
" update_time TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = '%s',\n" +
" 'format' = 'json',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")\n", tableName, topic, servers, group);
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(ddlSql);
报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any
factory for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more
参考了这个
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错
附上pom依赖:
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
感谢各位~