??????
????????????????????lib??????4??flink_kafka??jar???????????????????? ???? ------------------ ???????? ------------------ ??????: "hb"<[email protected]>; ????????: 2019??10??29??(??????) ????2:41 ??????: "user-zh"<[email protected]>; ????: flink1.9.1 kafka?????????? ????????ide ??????????, ??????????, ??????fat-jar????,??????yarn-session ?????? ??: Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer for configuration key.deserializer: Class org.apache.flink.kafka010.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer could not be found. ????????????????? lib????????????: flink-dist_2.11-1.9.1.jar flink-sql-connector-kafka-0.10_2.11-1.9.0.jar flink-sql-connector-kafka_2.11-1.9.0.jar log4j-1.2.17.jar flink-json-1.9.0-sql-jar.jar flink-sql-connector-kafka-0.11_2.11-1.9.0.jar flink-table_2.11-1.9.1.jar slf4j-log4j12-1.7.15.jar flink-shaded-hadoop-2-uber-2.6.5-7.0.jar flink-sql-connector-kafka-0.9_2.11-1.9.0.jar flink-table-blink_2.11-1.9.1.jar ????: ``` import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.scala.{StreamTableEnvironment, _} import org.apache.flink.types.Row object StreamingTable2 extends App{ val env = StreamExecutionEnvironment.getExecutionEnvironment val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings) env.setParallelism(2) val sourceDDL1 = """create table kafka_json_source( `timestamp` BIGINT, id int, name varchar ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'hbtest2', 'connector.startup-mode' = 'earliest-offset', 'connector.properties.0.key' = 'bootstrap.servers', 'connector.properties.0.value' = '192.168.1.160:19092', 'connector.properties.1.key' = 'group.id', 'connector.properties.1.value' = 'groupId1', 'connector.properties.2.key' = 'zookeeper.connect', 'connector.properties.2.value' = '192.168.1.160:2181', 'update-mode' = 'append', 'format.type' = 'json', 'format.derive-schema' = 'true' ) """ tEnv.sqlUpdate(sourceDDL1) tEnv.sqlQuery("select * from kafka_json_source").toAppendStream[Row].print() env.execute("table-example2") } ```
