kafka版本是 kafka_2.11-1.1.0,
支持的kafka版本有哪些
在 2019-08-26 14:23:19,"pengcheng...@bonc.com.cn" <pengcheng...@bonc.com.cn> 写道:
>检查一下代码的kafka版本,可能是这方面的错误
>
>
>
>pengcheng...@bonc.com.cn
>
>发件人: hb
>发送时间: 2019-08-26 15:14
>收件人: user-zh
>主题: Re:Re: flink1.9 blink planner table ddl 使用问题
>之前少了 flink-connector-kafka_2.11 依赖,
>现在错误变成 Caused by: java.lang.NoSuchMethodError:
>org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.<init>(Lorg/slf4j/Logger;Lorg/apache/flink/streaming/connectors/kafka/internal/Handover;Ljava/util/Properties;Lorg/apache/flink/streaming/connectors/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V
>了
>
>
>pom依赖:
>```
> <dependencies>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-core</artifactId>
> <version>${flink.version}</version>
>
>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-clients_2.11</artifactId>
> <version>${flink.version}</version>
>
>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-scala_2.11</artifactId>
> <version>${flink.version}</version>
>
>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-streaming-scala_2.11</artifactId>
> <version>${flink.version}</version>
>
>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table</artifactId>
> <version>1.9.0</version>
> <type>pom</type>
> <scope>provided</scope>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-common</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-cep-scala_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-filesystem_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
> <version>${flink.version}</version>
><!-- <scope>provided</scope>-->
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-api-java-bridge_2.11</artifactId>
> <version>${flink.version}</version>
> <!-- <scope>provided</scope>-->
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner_2.11</artifactId>
> <version>${flink.version}</version>
> <!-- <scope>provided</scope>-->
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-runtime-blink_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-table-planner-blink_2.11</artifactId>
> <version>${flink.version}</version>
> <!-- <scope>provided</scope>-->
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
>
>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-connector-kafka_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-json</artifactId>
> <version>${flink.version}</version>
> </dependency>
> <dependency>
> <groupId>org.apache.flink</groupId>
> <artifactId>flink-runtime-web_2.11</artifactId>
> <version>${flink.version}</version>
> </dependency>
> </dependencies>
>
>
>```
>
>
>
>
>
>
>
>
>在 2019-08-26 13:37:51,"Jark Wu" <imj...@gmail.com> 写道:
>>Maven 需要同时依赖 flink-json 和 flink-connector-kafka_2.11
>>
>>Best,
>>Jark
>>
>>> 在 2019年8月26日,13:57,hb <343122...@163.com> 写道:
>>>
>>> 使用了你的ddl语句,还是报一样的错误.
>>> 我是在idea里面执行的,maven 配置的依赖.
>>>
>>> 在 2019-08-26 11:22:20,"Jark Wu" <imj...@gmail.com> 写道:
>>>> Hi,
>>>>
>>>> 初步看下来你的 DDL 中有这几部分定义的有问题。
>>>>
>>>> 1. 缺少format properties
>>>> 2. 缺少 connector.version
>>>> 3. bootstrap.severs 的配置方式写的不对...
>>>>
>>>>
>>>> 你可以参考下面这个作为example:
>>>>
>>>>
>>>> CREATE TABLE kafka_json_source (
>>>> rowtime TIMESTAMP,
>>>> user_name VARCHAR,
>>>> event ROW<message_type VARCHAR, message VARCHAR>
>>>> ) WITH (
>>>> 'connector.type' = 'kafka',
>>>> 'connector.version' = 'universal',
>>>> 'connector.topic' = 'test-json',
>>>> 'connector.startup-mode' = 'earliest-offset',
>>>> 'connector.properties.0.key' = 'zookeeper.connect',
>>>> 'connector.properties.0.value' = 'localhost:2181',
>>>> 'connector.properties.1.key' = 'bootstrap.servers',
>>>> 'connector.properties.1.value' = 'localhost:9092',
>>>> 'update-mode' = 'append',
>>>> 'format.type' = 'json',
>>>> 'format.derive-schema' = 'true'
>>>> );
>>>>
>>>>
>>>> Kafka 中的数据长这个样子:
>>>>
>>>> {"rowtime": "2018-03-12T08:00:00Z", "user_name": "Alice", "event": {
>>>> "message_type": "WARNING", "message": "This is a warning."}}
>>>>
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>>
>>>>> 在 2019年8月26日,09:52,hb <343122...@163.com> 写道:
>>>>>
>>>>> flink1.9 blink planner table 使用ddl 语句,创建表不成功,不知道是少了 定义属性还是
>>>>> 需要实现TableSourceFactory,还是其他什么.
>>>>>
>>>>>
>>>>> 提示:
>>>>> Exception in thread "main"
>>>>> org.apache.flink.table.api.ValidationException: SQL validation failed.
>>>>> findAndCreateTableSource failed.
>>>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>>>> Could not find a suitable table factory for
>>>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>>>> the classpath.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 代码:
>>>>> ```
>>>>> import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment,
>>>>> _}
>>>>> import org.apache.flink.table.api.{EnvironmentSettings, Types}
>>>>> import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>>>> import org.apache.flink.types.Row
>>>>>
>>>>>
>>>>> object KafkaInDDL extends App {
>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val settings: EnvironmentSettings =
>>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>>>> val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,
>>>>> settings)
>>>>>
>>>>>
>>>>> val sourceDDL =
>>>>> """create table sourceTable(
>>>>> id int,
>>>>> name varchar
>>>>> ) with (
>>>>> 'connector.type' = 'kafka',
>>>>> 'connector.property-version' = '1',
>>>>> 'update-mode' = 'append',
>>>>> 'bootstrap.servers' = '192.168.1.160:19092',
>>>>> 'connector.topic' = 'hbtest1',
>>>>> 'connector.startup-mode' = 'earliest-offset'
>>>>> )
>>>>> """
>>>>> tEnv.sqlUpdate(sourceDDL)
>>>>> tEnv.sqlQuery("select * from sourceTable").toAppendStream[Row].print()
>>>>> tEnv.execute("")
>>>>> }
>>>>> ```
>>>>