hi????????????????checkpoint?????????????????????????????? Best
| | a511955993 | | [email protected] | ?????? ???????????? ???? ??2020??09??04?? 14:11??op ?????? ???????? ???? ------------------ ???????? ------------------ ??????: "user-zh" <[email protected]>; ????????: 2020??9??4??(??????) ????11:54 ??????: "user-zh"<[email protected]>;"Shuiqiang Chen"<[email protected]>; ????: Re: FlinkKafkaConsumer???? ????????exactly-once??flink????????barrier??????checkpoint??????barrier????????????????flink??kafkaconsumer???????????????????????????????? ?? 2020/9/4 10:34, Shuiqiang Chen ????: > Hi?? > ???????? Flink ?????? exactly-once???????????? Kafka source ???????????????????????? partition ???? offset ??????????????checkpoint ?????????????????? state ???? ???? checkpoint ???????????? commit ???????????????????? exactly-once. ?????? Kafka ???????????????? FlinkKafkaConsumer ?????????????????????? partition ???? Kafka ???????????????? offset ???? Kafka ??????????????????Flink ?????????????????? > >> ?? 2020??9??4????????10:25??op <[email protected]> ?????? >> >> ????????????????????????FlinkKafkaConsumer????????????KafkaConsumer??????????flink????kafka????????????????&nbsp; >> >> >> ------------------&nbsp;????????&nbsp;------------------ >> ??????: "user-zh" <[email protected]&gt;; >> ????????:&nbsp;2020??9??3??(??????) ????6:09 >> ??????:&nbsp;"user-zh"<[email protected]&gt;; >> >> ????:&nbsp;Re: FlinkKafkaConsumer???? >> >> >> >> Hi op, >> >> ?? Flink ???? Kafka ?????????? ?? FlinkKafkaConsumer ???? Kafka ?????????? topic ?????? partition ?????????????????????????????? group id ?????????????? partition ?????? offset commit ?? Kafka???????????????????????????? KafkaConsumer ?????????????????? Kafka ????????????, ???? Kafka ?????????????????? >> >> ???????????????????????????? topic ?? group id ???? kafka?? ?????????????????????????? group id commit offset ??kafka?? ?????? group offset ?????????????????? ?????????????? commit ?? offset ?????????? >> >> &gt; ?? 2020??9??3????????3:03??op <[email protected]&gt; ?????? >> &gt; >> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; ????FlinkKafkaConsumer????????????????&amp;nbsp; &amp;nbsp; ???????????????????????? >> &gt; //--------------------------- >> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment >> &gt; Env.setRestartStrategy(RestartStrategies.noRestart()) >> &gt; val consumerProps = new Properties() >> &gt; consumerProps.put("bootstrap.servers", brokers) >> &gt; consumerProps.put("group.id", "test1234") >> &gt; >> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new KafkaStringSchema,consumerProps).setStartFromLatest() >> &gt; Env.addSource(consumer).print() >> &gt; Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka ??consumer group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11 ????
