Hi,

初步看下来你的 DDL 中有这几部分定义的有问题。

1. 缺少format properties
2. 缺少 connector.version
3. bootstrap.severs 的配置方式写的不对...


你可以参考下面这个作为example:


CREATE TABLE kafka_json_source (
    rowtime TIMESTAMP,
    user_name VARCHAR,
    event ROW<message_type VARCHAR, message VARCHAR>
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'test-json',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'zookeeper.connect',
    'connector.properties.0.value' = 'localhost:2181',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'localhost:9092',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);


Kafka 中的数据长这个样子:

{"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { 
"message_type": "WARNING", "message": "This is a warning."}}


Best,
Jark


> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
> 
> flink1.9 blink planner  table  使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 
> 需要实现TableSourceFactory,还是其他什么.
> 
> 
> 提示:  
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> SQL validation failed. findAndCreateTableSource failed.
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
> not find a suitable table factory for 
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
> 
> 
> 
> 
> 代码:
> ```
> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
> import org.apache.flink.table.api.{EnvironmentSettings, Types}
> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
> import org.apache.flink.types.Row
> 
> 
> object KafkaInDDL extends App {
>  val env = StreamExecutionEnvironment.getExecutionEnvironment
>  val settings: EnvironmentSettings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
> settings)
> 
> 
>  val sourceDDL =
>    """create table sourceTable(
>                            id int,
>                            name varchar
>                          ) with (
>                            'connector.type' = 'kafka',
>                            'connector.property-version' = '1',
>                            'update-mode' = 'append',
>                            'bootstrap.servers' = '192.168.1.160:19092',
>                            'connector.topic' = 'hbtest1',
>                            'connector.startup-mode' = 'earliest-offset'
>                          )
>    """
>  tEnv.sqlUpdate(sourceDDL)
>  tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>  tEnv.execute("")
> }
> ```

回复