??????
     maven??pom????????????????????


????


 





------------------ ???????? ------------------
??????:&nbsp;"hb"<[email protected]&gt;;
????????:&nbsp;2019??10??29??(??????) ????2:53
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re:??????flink1.9.1 kafka??????????






??????????0.11????kafka, ????lib?????? ???? 0.11 jar??????, ????????????????





?? 2019-10-29 13:47:34??"????????" <[email protected]&gt; ??????
&gt;??????
&gt;
&gt;
&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; 
????????????????????lib??????4??flink_kafka??jar????????????????????
&gt;
&gt;
&gt;
&gt;????
&gt;
&gt;
&gt;&amp;nbsp;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt;??????:&amp;nbsp;"hb"<[email protected]&amp;gt;;
&gt;????????:&amp;nbsp;2019??10??29??(??????) ????2:41
&gt;??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt;????:&amp;nbsp;flink1.9.1 kafka??????????
&gt;
&gt;
&gt;
&gt;????????ide ??????????, ??????????,
&gt;
&gt;
&gt;??????fat-jar????,??????yarn-session ??????
&gt;??:
&gt;Caused by: org.apache.kafka.common.config.ConfigException: Invalid value 
org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 for configuration key.deserializer: Class 
org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer
 could not be found.
&gt;
&gt;
&gt;?????????????????
&gt;
&gt;
&gt;lib????????????:
&gt;flink-dist_2.11-1.9.1.jar&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 
&gt;flink-sql-connector-kafka-0.10_2.11-1.9.0.jar&amp;nbsp; 
&gt;flink-sql-connector-kafka_2.11-1.9.0.jar&amp;nbsp; 
&gt;log4j-1.2.17.jar
&gt;flink-json-1.9.0-sql-jar.jar
&gt;flink-sql-connector-kafka-0.11_2.11-1.9.0.jar&amp;nbsp; 
&gt;flink-table_2.11-1.9.1.jar&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 
&gt;slf4j-log4j12-1.7.15.jar
&gt;flink-shaded-hadoop-2-uber-2.6.5-7.0.jar&amp;nbsp; 
&gt;flink-sql-connector-kafka-0.9_2.11-1.9.0.jar&amp;nbsp;&amp;nbsp; 
&gt;flink-table-blink_2.11-1.9.1.jar
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;????:
&gt;```
&gt;import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
&gt;import org.apache.flink.table.api.EnvironmentSettings
&gt;import org.apache.flink.table.api.scala.{StreamTableEnvironment, _}
&gt;import org.apache.flink.types.Row
&gt;
&gt;object StreamingTable2 extends App{
&gt;&amp;nbsp; val env = StreamExecutionEnvironment.getExecutionEnvironment
&gt;&amp;nbsp; val settings: EnvironmentSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
&gt;&amp;nbsp; val tEnv: StreamTableEnvironment = 
StreamTableEnvironment.create(env, settings)
&gt;&amp;nbsp; env.setParallelism(2)
&gt;
&gt;&amp;nbsp; val sourceDDL1 =
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; """create table kafka_json_source(
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 `timestamp` BIGINT,
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 id int,
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 name varchar
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 ) with (
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.type' = 'kafka',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.version' = '0.11',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.topic' = 'hbtest2',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.startup-mode' = 'earliest-offset',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.properties.0.key' = 'bootstrap.servers',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.properties.0.value' = '192.168.1.160:19092',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.properties.1.key' = 'group.id',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.properties.1.value' = 'groupId1',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.properties.2.key' = 'zookeeper.connect',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'connector.properties.2.value' = '192.168.1.160:2181',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'update-mode' = 'append',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'format.type' = 'json',
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 'format.derive-schema' = 'true'
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
 )
&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp; """
&gt;
&gt;&amp;nbsp; tEnv.sqlUpdate(sourceDDL1)
&gt;&amp;nbsp; tEnv.sqlQuery("select * from 
kafka_json_source").toAppendStream[Row].print()
&gt;&amp;nbsp; env.execute("table-example2")
&gt;}
&gt;```

回复