感谢,解决了, 指定 'connector.version' = '0.11' 就可以了. Blink SQL这方面的官方资料和文档好少啊,开发容易遇到问题.
在 2019-08-26 14:26:15,"hb" <343122...@163.com> 写道: >kafka版本是 kafka_2.11-1.1.0, >支持的kafka版本有哪些 >在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <pengcheng...@bonc.com.cn> 写道: >>检查一下代码的kafka版本,可能是这方面的错误 >> >> >> >>pengcheng...@bonc.com.cn >> >>发件人: hb >>发送时间: 2019-08-26 15:14 >>收件人: user-zh >>主题: Re:Re: flink1.9 blink planner table ddl 使用问题 >>之前少了 flink-connector-kafka_2.11 依赖, >>现在错误变成 Caused by: java.lang.NoSuchMethodError: >>org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V >>了 >> >> >>pom依赖: >>``` >> <dependencies> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-core</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-clients_2.11</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-scala_2.11</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-streaming-scala_2.11</artifactId> >> <version>${flink.version}</version> >> >> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table</artifactId> >> <version>1.9.0</version> >> <type>pom</type> >> <scope>provided</scope> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-common</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-cep-scala_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-filesystem_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> >> <version>${flink.version}</version> >><!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-api-java-bridge_2.11</artifactId> >> <version>${flink.version}</version> >> <!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner_2.11</artifactId> >> <version>${flink.version}</version> >> <!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-runtime-blink_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-table-planner-blink_2.11</artifactId> >> <version>${flink.version}</version> >> <!-- <scope>provided</scope>--> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> >> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-connector-kafka_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-json</artifactId> >> <version>${flink.version}</version> >> </dependency> >> <dependency> >> <groupId>org.apache.flink</groupId> >> <artifactId>flink-runtime-web_2.11</artifactId> >> <version>${flink.version}</version> >> </dependency> >> </dependencies> >> >> >>``` >> >> >> >> >> >> >> >> >>在 2019-08-26 13:37:51,"Jark Wu" <imj...@gmail.com> 写道: >>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 >>> >>>Best, >>>Jark >>> >>>> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道: >>>> >>>> 使用了你的ddl语句,还是报一样的错误. >>>> 我是在idea里面执行的,maven 配置的依赖. >>>> >>>> 在 2019-08-26 11:22:20,"Jark Wu" <imj...@gmail.com> 写道: >>>>> Hi, >>>>> >>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>>>> >>>>> 1. 缺少format properties >>>>> 2. 缺少 connector.version >>>>> 3. bootstrap.severs 的配置方式写的不对... >>>>> >>>>> >>>>> 你可以参考下面这个作为example: >>>>> >>>>> >>>>> CREATE TABLE kafka_json_source ( >>>>> rowtime TIMESTAMP, >>>>> user_name VARCHAR, >>>>> event ROW<message_type VARCHAR, message VARCHAR> >>>>> ) WITH ( >>>>> 'connector.type' = 'kafka', >>>>> 'connector.version' = 'universal', >>>>> 'connector.topic' = 'test-json', >>>>> 'connector.startup-mode' = 'earliest-offset', >>>>> 'connector.properties.0.key' = 'zookeeper.connect', >>>>> 'connector.properties.0.value' = 'localhost:2181', >>>>> 'connector.properties.1.key' = 'bootstrap.servers', >>>>> 'connector.properties.1.value' = 'localhost:9092', >>>>> 'update-mode' = 'append', >>>>> 'format.type' = 'json', >>>>> 'format.derive-schema' = 'true' >>>>> ); >>>>> >>>>> >>>>> Kafka 中的数据长这个样子: >>>>> >>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { >>>>> "message_type": "WARNING", "message": "This is a warning."}} >>>>> >>>>> >>>>> Best, >>>>> Jark >>>>> >>>>> >>>>>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道: >>>>>> >>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 >>>>>> 需要实现TableSourceFactory,还是其他什么. >>>>>> >>>>>> >>>>>> 提示: >>>>>> Exception in thread "main" >>>>>> org.apache.flink.table.api.ValidationException: SQL validation failed. >>>>>> findAndCreateTableSource failed. >>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>>>> Could not find a suitable table factory for >>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>>>> the classpath. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 代码: >>>>>> ``` >>>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, >>>>>> _} >>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>>>> import org.apache.flink.types.Row >>>>>> >>>>>> >>>>>> object KafkaInDDL extends App { >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>> val settings: EnvironmentSettings = >>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, >>>>>> settings) >>>>>> >>>>>> >>>>>> val sourceDDL = >>>>>> """create table sourceTable( >>>>>> id int, >>>>>> name varchar >>>>>> ) with ( >>>>>> 'connector.type' = 'kafka', >>>>>> 'connector.property-version' = '1', >>>>>> 'update-mode' = 'append', >>>>>> 'bootstrap.servers' = '192.168.1.160:19092', >>>>>> 'connector.topic' = 'hbtest1', >>>>>> 'connector.startup-mode' = 'earliest-offset' >>>>>> ) >>>>>> """ >>>>>> tEnv.sqlUpdate(sourceDDL) >>>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>>>>> tEnv.execute("") >>>>>> } >>>>>> ``` >>>>>