Hi, 初步看下来你的 DDL 中有这几部分定义的有问题。
1. 缺少format properties 2. 缺少 connector.version 3. bootstrap.severs 的配置方式写的不对... 你可以参考下面这个作为example: CREATE TABLE kafka_json_source ( rowtime TIMESTAMP, user_name VARCHAR, event ROW<message_type VARCHAR, message VARCHAR> ) WITH ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'test-json', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'zookeeper.connect', 'connector.properties.0.value' = 'localhost:2181', 'connector.properties.1.key' = 'bootstrap.servers', 'connector.properties.1.value' = 'localhost:9092', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ); Kafka 中的数据长这个样子: {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { "message_type": "WARNING", "message": "This is a warning."}} Best, Jark > 在 2019年8月26日,09:52,hb <343122...@163.com> 写道: > > flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 > 需要实现TableSourceFactory,还是其他什么. > > > 提示: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. findAndCreateTableSource failed. > Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could > not find a suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > > > > > 代码: > ``` > import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} > import org.apache.flink.table.api.{EnvironmentSettings, Types} > import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} > import org.apache.flink.types.Row > > > object KafkaInDDL extends App { > val env = StreamExecutionEnvironment.getExecutionEnvironment > val settings: EnvironmentSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, > settings) > > > val sourceDDL = > """create table sourceTable( > id int, > name varchar > ) with ( > 'connector.type' = 'kafka', > 'connector.property-version' = '1', > 'update-mode' = 'append', > 'bootstrap.servers' = '192.168.1.160:19092', > 'connector.topic' = 'hbtest1', > 'connector.startup-mode' = 'earliest-offset' > ) > """ > tEnv.sqlUpdate(sourceDDL) > tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() > tEnv.execute("") > } > ```