???????? ????
------------------ ???????? ------------------ ??????: "user-zh" <taochangl...@163.com>; ????????: 2020??9??4??(??????) ????11:54 ??????: "user-zh"<user-zh@flink.apache.org>;"Shuiqiang Chen"<acqua....@gmail.com>; ????: Re: FlinkKafkaConsumer???? ????????exactly-once??flink????????barrier??????checkpoint??????barrier????????????????flink??kafkaconsumer???????????????????????????????? ?? 2020/9/4 10:34, Shuiqiang Chen ????: > Hi?? > ???????? Flink ?????? exactly-once???????????? Kafka source ???????????????????????? partition ???? offset ??????????????checkpoint ?????????????????? state ???? ???? checkpoint ???????????? commit ???????????????????? exactly-once. ?????? Kafka ???????????????? FlinkKafkaConsumer ?????????????????????? partition ???? Kafka ???????????????? offset ???? Kafka ??????????????????Flink ?????????????????? > >> ?? 2020??9??4????????10:25??op <520075...@qq.com> ?????? >> >> ????????????????????????FlinkKafkaConsumer????????????KafkaConsumer??????????flink????kafka????????????????&nbsp; >> >> >> ------------------&nbsp;????????&nbsp;------------------ >> ??????: "user-zh" <acqua....@gmail.com&gt;; >> ????????:&nbsp;2020??9??3??(??????) ????6:09 >> ??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;; >> >> ????:&nbsp;Re: FlinkKafkaConsumer???? >> >> >> >> Hi op, >> >> ?? Flink ???? Kafka ?????????? ?? FlinkKafkaConsumer ???? Kafka ?????????? topic ?????? partition ?????????????????????????????? group id ?????????????? partition ?????? offset commit ?? Kafka???????????????????????????? KafkaConsumer ?????????????????? Kafka ????????????, ???? Kafka ?????????????????? >> >> ???????????????????????????? topic ?? group id ???? kafka?? ?????????????????????????? group id commit offset ??kafka?? ?????? group offset ?????????????????? ?????????????? commit ?? offset ?????????? >> >> &gt; ?? 2020??9??3????????3:03??op <520075...@qq.com&gt; ?????? >> &gt; >> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; ????FlinkKafkaConsumer????????????????&amp;nbsp; &amp;nbsp; ???????????????????????? >> &gt; //--------------------------- >> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> &gt; Env.setRestartStrategy(RestartStrategies.noRestart()) >> &gt; val consumerProps = new Properties() >> &gt; consumerProps.put("bootstrap.servers", brokers) >> &gt; consumerProps.put("group.id", "test1234") >> &gt; >> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() >> &gt; Env.addSource(consumer).print() >> &gt; Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka ??consumer group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11 ????