检查一下代码的kafka版本,可能是这方面的错误
pengcheng...@bonc.com.cn 发件人: hb 发送时间: 2019-08-26 15:14 收件人: user-zh 主题: Re:Re: flink1.9 blink planner table ddl 使用问题 之前少了 flink-connector-kafka_2.11 依赖, 现在错误变成 Caused by: java.lang.NoSuchMethodError: org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V 了 pom依赖: ``` <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.9.0</version> <type>pom</type> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime-blink_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.11</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.11</artifactId> <version>${flink.version}</version> </dependency> </dependencies> ``` 在 2019-08-26 13:37:51,"Jark Wu" <imj...@gmail.com> 写道: >Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11 > >Best, >Jark > >> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道: >> >> 使用了你的ddl语句,还是报一样的错误. >> 我是在idea里面执行的,maven 配置的依赖. >> >> 在 2019-08-26 11:22:20,"Jark Wu" <imj...@gmail.com> 写道: >>> Hi, >>> >>> 初步看下来你的 DDL 中有这几部分定义的有问题。 >>> >>> 1. 缺少format properties >>> 2. 缺少 connector.version >>> 3. bootstrap.severs 的配置方式写的不对... >>> >>> >>> 你可以参考下面这个作为example: >>> >>> >>> CREATE TABLE kafka_json_source ( >>> rowtime TIMESTAMP, >>> user_name VARCHAR, >>> event ROW<message_type VARCHAR, message VARCHAR> >>> ) WITH ( >>> 'connector.type' = 'kafka', >>> 'connector.version' = 'universal', >>> 'connector.topic' = 'test-json', >>> 'connector.startup-mode' = 'earliest-offset', >>> 'connector.properties.0.key' = 'zookeeper.connect', >>> 'connector.properties.0.value' = 'localhost:2181', >>> 'connector.properties.1.key' = 'bootstrap.servers', >>> 'connector.properties.1.value' = 'localhost:9092', >>> 'update-mode' = 'append', >>> 'format.type' = 'json', >>> 'format.derive-schema' = 'true' >>> ); >>> >>> >>> Kafka 中的数据长这个样子: >>> >>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": { >>> "message_type": "WARNING", "message": "This is a warning."}} >>> >>> >>> Best, >>> Jark >>> >>> >>>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道: >>>> >>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是 >>>> 需要实现TableSourceFactory,还是其他什么. >>>> >>>> >>>> 提示: >>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>>> SQL validation failed. findAndCreateTableSource failed. >>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: >>>> Could not find a suitable table factory for >>>> 'org.apache.flink.table.factories.TableSourceFactory' in >>>> the classpath. >>>> >>>> >>>> >>>> >>>> 代码: >>>> ``` >>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >>>> import org.apache.flink.table.api.{EnvironmentSettings, Types} >>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >>>> import org.apache.flink.types.Row >>>> >>>> >>>> object KafkaInDDL extends App { >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> val settings: EnvironmentSettings = >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, >>>> settings) >>>> >>>> >>>> val sourceDDL = >>>> """create table sourceTable( >>>> id int, >>>> name varchar >>>> ) with ( >>>> 'connector.type' = 'kafka', >>>> 'connector.property-version' = '1', >>>> 'update-mode' = 'append', >>>> 'bootstrap.servers' = '192.168.1.160:19092', >>>> 'connector.topic' = 'hbtest1', >>>> 'connector.startup-mode' = 'earliest-offset' >>>> ) >>>> """ >>>> tEnv.sqlUpdate(sourceDDL) >>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print() >>>> tEnv.execute("") >>>> } >>>> ``` >>>