Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 Best, Jark
> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道: > > 使用了你的ddl语句,还是报一样的错误. > 我是在idea里面执行的,maven 配置的依赖. > > 在 2019-08-26 11:22:20,"Jark Wu" <imj...@gmail.com> 写道: >> Hi, >> >> 初步看下来你的 DDL 中有这几部分定义的有问题。 >> >> 1. 缺少format properties >> 2. 缺少 connector.version >> 3. bootstrap.severs 的配置方式写的不对... >> >> >> 你可以参考下面这个作为example: >> >> >> CREATE TABLE kafka_json_source ( >> rowtime TIMESTAMP, >> user_name VARCHAR, >> event ROW<message_type VARCHAR, message VARCHAR> >> ) WITH ( >> 'connector.type' = 'kafka', >> 'connector.version' = 'universal', >> 'connector.topic' = 'test-json', >> 'connector.startup-mode' = 'earliest-offset', >> 'connector.properties.0.key' = 'zookeeper.connect', >> 'connector.properties.0.value' = 'localhost:2181', >> 'connector.properties.1.key' = 'bootstrap.servers', >> 'connector.properties.1.value' = 'localhost:9092', >> 'update-mode' = 'append', >> 'format.type' = 'json', >> 'format.derive-schema' = 'true' >> ); >> >> >> Kafka 中的数据长这个样子: >> >> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { >> "message_type": "WARNING", "message": "This is a warning."}} >> >> >> Best, >> Jark >> >> >>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道: >>> >>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 >>> 需要实现TableSourceFactory,还是其他什么. >>> >>> >>> 提示: >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> SQL validation failed. findAndCreateTableSource failed. >>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>> Could not find a suitable table factory for >>> 'org.apache.flink.table.factories.TableSourceFactory' in >>> the classpath. >>> >>> >>> >>> >>> 代码: >>> ``` >>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>> import org.apache.flink.types.Row >>> >>> >>> object KafkaInDDL extends App { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> val settings: EnvironmentSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, >>> settings) >>> >>> >>> val sourceDDL = >>> """create table sourceTable( >>> id int, >>> name varchar >>> ) with ( >>> 'connector.type' = 'kafka', >>> 'connector.property-version' = '1', >>> 'update-mode' = 'append', >>> 'bootstrap.servers' = '192.168.1.160:19092', >>> 'connector.topic' = 'hbtest1', >>> 'connector.startup-mode' = 'earliest-offset' >>> ) >>> """ >>> tEnv.sqlUpdate(sourceDDL) >>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>> tEnv.execute("") >>> } >>> ``` >>