?????? maven??pom????????????????????
???? ------------------ ???????? ------------------ ??????: "hb"<[email protected]>; ????????: 2019??10??29??(??????) ????2:53 ??????: "user-zh"<[email protected]>; ????: Re:??????flink1.9.1 kafka?????????? ??????????0.11????kafka, ????lib?????? ???? 0.11 jar??????, ???????????????? ?? 2019-10-29 13:47:34??"????????" <[email protected]> ?????? >?????? > > >&nbsp; &nbsp; &nbsp; ????????????????????lib??????4??flink_kafka??jar???????????????????? > > > >???? > > >&nbsp; > > > > > >------------------&nbsp;????????&nbsp;------------------ >??????:&nbsp;"hb"<[email protected]&gt;; >????????:&nbsp;2019??10??29??(??????) ????2:41 >??????:&nbsp;"user-zh"<[email protected]&gt;; > >????:&nbsp;flink1.9.1 kafka?????????? > > > >????????ide ??????????, ??????????, > > >??????fat-jar????,??????yarn-session ?????? >??: >Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. > > >????????????????? > > >lib????????????: >flink-dist_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >flink-sql-connector-kafka-0.10_2.11-1.9.0.jar&nbsp; >flink-sql-connector-kafka_2.11-1.9.0.jar&nbsp; >log4j-1.2.17.jar >flink-json-1.9.0-sql-jar.jar >flink-sql-connector-kafka-0.11_2.11-1.9.0.jar&nbsp; >flink-table_2.11-1.9.1.jar&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; >slf4j-log4j12-1.7.15.jar >flink-shaded-hadoop-2-uber-2.6.5-7.0.jar&nbsp; >flink-sql-connector-kafka-0.9_2.11-1.9.0.jar&nbsp;&nbsp; >flink-table-blink_2.11-1.9.1.jar > > > > > > >????: >``` >import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} >import org.apache.flink.table.api.EnvironmentSettings >import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} >import org.apache.flink.types.Row > >object StreamingTable2 extends App{ >&nbsp; val env = StreamExecutionEnvironment.getExecutionEnvironment >&nbsp; val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() >&nbsp; val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) >&nbsp; env.setParallelism(2) > >&nbsp; val sourceDDL1 = >&nbsp;&nbsp;&nbsp; """create table kafka_json_source( >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; `timestamp` BIGINT, >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; id int, >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; name varchar >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) with ( >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.type' = 'kafka', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.version' = '0.11', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.topic' = 'hbtest2', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.startup-mode' = 'earliest-offset', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.0.key' = 'bootstrap.servers', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.0.value' = '192.168.1.160:19092', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.1.key' = 'group.id', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.1.value' = 'groupId1', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.2.key' = 'zookeeper.connect', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'connector.properties.2.value' = '192.168.1.160:2181', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'update-mode' = 'append', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.type' = 'json', >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 'format.derive-schema' = 'true' >&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ) >&nbsp;&nbsp;&nbsp; """ > >&nbsp; tEnv.sqlUpdate(sourceDDL1) >&nbsp; tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() >&nbsp; env.execute("table-example2") >} >```
