hi, ????FlinkKafkaConsumer????????????????
????????????????????????
//---------------------------
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
Env.setRestartStrategy(RestartStrategies.noRestart())
val consumerProps = new Properties()
consumerProps.put("bootstrap.servers", brokers)
consumerProps.put("group.id", "test1234")
val consumer = new FlinkKafkaConsumer[String](topic,new
KafkaStringSchema,consumerProps).setStartFromLatest()
Env.addSource(consumer).print()
Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka
??consumer
group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11
????