pom 文件
```
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hb</groupId>
<artifactId>flink</artifactId>
<packaging>pom</packaging>
<version>1.9.1-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.1</flink.version>
<scala.binary.version>2.11</scala.binary.version>
<scala.version>2.11.12</scala.version>
<elasticsearch.hadoop>6.2.3</elasticsearch.hadoop>
<jcommander.version>1.72</jcommander.version>
<gson.version>2.6.2</gson.version>
<kafka.version>0.11.0.2</kafka.version>
<fastjson.version>1.2.46</fastjson.version>
<flink-connector-kafka>1.9.1</flink-connector-kafka>
<log4j.version>1.2.17</log4j.version>
<mysql-connector-java.version>5.1.42</mysql-connector-java.version>
<net.dongliu.requests.version>4.18.1</net.dongliu.requests.version>
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
<flink.scope.type>compile</flink.scope.type>
<!--<flink.scope.type>provided</flink.scope.type>-->
<scope.type>compile</scope.type>
</properties>
<dependencies>
<!--flink-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope.type}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
<scope>${flink.scope.type}</scope>
</dependency>
<!-- table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink-connector-kafka}</version>
</dependency>
<!-- kafka DDL 需要用的依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink-end-->
<!---->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>${elasticsearch.hadoop}</version>
<scope>${scope.type}</scope>
</dependency>
<!--scala-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>${scala.version}</version>
<scope>${scope.type}</scope>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<!--<exclusions>-->
<!--<exclusion>-->
<!--<artifactId>lz4</artifactId>-->
<!--<groupId>net.jpountz.lz4</groupId>-->
<!--</exclusion>-->
<!--</exclusions>-->
</dependency>
<!--commander-->
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
<version>${jcommander.version}</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>${gson.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java.version}</version>
</dependency>
<dependency>
<groupId>net.dongliu</groupId>
<artifactId>requests</artifactId>
<version>${net.dongliu.requests.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.3.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>defaults.yaml</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
```
在 2019-10-29 14:05:51,"如影随形" <[email protected]> 写道:
>你好:
> maven的pom文件能贴出来看一下吗
>
>
>
>陈浩
>
>
>
>
>
>
>
>
>------------------ 原始邮件 ------------------
>发件人: "hb"<[email protected]>;
>发送时间: 2019年10月29日(星期二) 下午2:53
>收件人: "user-zh"<[email protected]>;
>
>主题: Re:回复:flink1.9.1 kafka表读取问题
>
>
>
>
>
>
>我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的
>
>
>
>
>
>在 2019-10-29 13:47:34,"如影随形" <[email protected]> 写道:
>>你好:
>>
>>
>>&nbsp; &nbsp; &nbsp;
>看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢
>>
>>
>>
>>陈浩
>>
>>
>>&nbsp;
>>
>>
>>
>>
>>
>>------------------&nbsp;原始邮件&nbsp;------------------
>>发件人:&nbsp;"hb"<[email protected]&gt;;
>>发送时间:&nbsp;2019年10月29日(星期二) 下午2:41
>>收件人:&nbsp;"user-zh"<[email protected]&gt;;
>>
>>主题:&nbsp;flink1.9.1 kafka表读取问题
>>
>>
>>
>>代码本地ide 能正常执行, 有正常输出,
>>
>>
>>打包成fat-jar包后,提交到yarn-session 上执行
>>报:
>>Caused by: org.apache.kafka.common.config.ConfigException: Invalid value
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> for configuration key.deserializer: Class
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> could not be found.
>>
>>
>>请教下是什么原因?
>>
>>
>>lib目录下文件为:
>>flink-dist_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>
>>flink-sql-connector-kafka-0.10_2.11-1.9.0.jar&nbsp;
>>flink-sql-connector-kafka_2.11-1.9.0.jar&nbsp;
>>log4j-1.2.17.jar
>>flink-json-1.9.0-sql-jar.jar
>>flink-sql-connector-kafka-0.11_2.11-1.9.0.jar&nbsp;
>>flink-table_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
>
>>slf4j-log4j12-1.7.15.jar
>>flink-shaded-hadoop-2-uber-2.6.5-7.0.jar&nbsp;
>>flink-sql-connector-kafka-0.9_2.11-1.9.0.jar&nbsp;&nbsp;
>>flink-table-blink_2.11-1.9.1.jar
>>
>>
>>
>>
>>
>>
>>代码:
>>```
>>import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>>import org.apache.flink.table.api.EnvironmentSettings
>>import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>>import org.apache.flink.types.Row
>>
>>object StreamingTable2 extends App{
>>&nbsp; val env = StreamExecutionEnvironment.getExecutionEnvironment
>>&nbsp; val settings: EnvironmentSettings =
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>>&nbsp; val tEnv: StreamTableEnvironment =
>StreamTableEnvironment.create(env, settings)
>>&nbsp; env.setParallelism(2)
>>
>>&nbsp; val sourceDDL1 =
>>&nbsp;&nbsp;&nbsp; """create table kafka_json_source(
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> `timestamp` BIGINT,
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> id int,
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> name varchar
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ) with (
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.type' = 'kafka',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.version' = '0.11',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.topic' = 'hbtest2',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.startup-mode' = 'earliest-offset',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.0.key' = 'bootstrap.servers',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.0.value' = '192.168.1.160:19092',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.1.key' = 'group.id',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.1.value' = 'groupId1',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.2.key' = 'zookeeper.connect',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.2.value' = '192.168.1.160:2181',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'update-mode' = 'append',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'format.type' = 'json',
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'format.derive-schema' = 'true'
>>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> )
>>&nbsp;&nbsp;&nbsp; """
>>
>>&nbsp; tEnv.sqlUpdate(sourceDDL1)
>>&nbsp; tEnv.sqlQuery("select * from
>kafka_json_source").toAppendStream[Row].print()
>>&nbsp; env.execute("table-example2")
>>}
>>```