我指定的是0.11版本kafka, 之前lib目录下 只有 0.11 jar的文件, 还是报这个错误的





在 2019-10-29 13:47:34,"如影随形" <[email protected]> 写道:
>你好:
>
>
>&nbsp; &nbsp; &nbsp; 看着像是包冲突了,你lib包下有4个flink_kafka的jar包,去掉不用的看看呢
>
>
>
>陈浩
>
>
>&nbsp;
>
>
>
>
>
>------------------&nbsp;原始邮件&nbsp;------------------
>发件人:&nbsp;"hb"<[email protected]&gt;;
>发送时间:&nbsp;2019年10月29日(星期二) 下午2:41
>收件人:&nbsp;"user-zh"<[email protected]&gt;;
>
>主题:&nbsp;flink1.9.1 kafka表读取问题
>
>
>
>代码本地ide 能正常执行, 有正常输出,
>
>
>打包成fat-jar包后,提交到yarn-session 上执行
>报:
>Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> for configuration key.deserializer: Class 
>org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
> could not be found.
>
>
>请教下是什么原因?
>
>
>lib目录下文件为:
>flink-dist_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 
>flink-sql-connector-kafka-0.10_2.11-1.9.0.jar&nbsp; 
>flink-sql-connector-kafka_2.11-1.9.0.jar&nbsp; 
>log4j-1.2.17.jar
>flink-json-1.9.0-sql-jar.jar
>flink-sql-connector-kafka-0.11_2.11-1.9.0.jar&nbsp; 
>flink-table_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 
>slf4j-log4j12-1.7.15.jar
>flink-shaded-hadoop-2-uber-2.6.5-7.0.jar&nbsp; 
>flink-sql-connector-kafka-0.9_2.11-1.9.0.jar&nbsp;&nbsp; 
>flink-table-blink_2.11-1.9.1.jar
>
>
>
>
>
>
>代码:
>```
>import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
>import org.apache.flink.table.api.EnvironmentSettings
>import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
>import org.apache.flink.types.Row
>
>object StreamingTable2 extends App{
>&nbsp; val env = StreamExecutionEnvironment.getExecutionEnvironment
>&nbsp; val settings: EnvironmentSettings = 
>EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>&nbsp; val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, 
>settings)
>&nbsp; env.setParallelism(2)
>
>&nbsp; val sourceDDL1 =
>&nbsp;&nbsp;&nbsp; """create table kafka_json_source(
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> `timestamp` BIGINT,
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> id int,
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> name varchar
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> ) with (
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.type' = 'kafka',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.version' = '0.11',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.topic' = 'hbtest2',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.startup-mode' = 'earliest-offset',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.0.key' = 'bootstrap.servers',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.0.value' = '192.168.1.160:19092',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.1.key' = 'group.id',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.1.value' = 'groupId1',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.2.key' = 'zookeeper.connect',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'connector.properties.2.value' = '192.168.1.160:2181',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'update-mode' = 'append',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'format.type' = 'json',
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> 'format.derive-schema' = 'true'
>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> )
>&nbsp;&nbsp;&nbsp; """
>
>&nbsp; tEnv.sqlUpdate(sourceDDL1)
>&nbsp; tEnv.sqlQuery("select * from 
>kafka_json_source").toAppendStream[Row].print()
>&nbsp; env.execute("table-example2")
>}
>```

回复