???????? ????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<taochangl...@163.com&gt;;
????????:&nbsp;2020??9??4??(??????) ????11:54
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"Shuiqiang 
Chen"<acqua....@gmail.com&gt;;

????:&nbsp;Re: FlinkKafkaConsumer????



????????exactly-once??flink????????barrier??????checkpoint??????barrier????????????????flink??kafkaconsumer????????????????????????????????

?? 2020/9/4 10:34, Shuiqiang Chen ????:

&gt; Hi??
&gt; ???????? Flink ?????? exactly-once???????????? Kafka source&nbsp; 
???????????????????????? partition ???? offset ??????????????checkpoint 
?????????????????? state ???? ???? checkpoint ???????????? commit 
???????????????????? exactly-once.&nbsp; ?????? Kafka ???????????????? 
FlinkKafkaConsumer ?????????????????????? partition ???? Kafka ???????????????? 
offset ???? Kafka ??????????????????Flink ??????????????????
&gt;
&gt;&gt; ?? 2020??9??4????????10:25??op <520075...@qq.com&gt; ??????
&gt;&gt;
&gt;&gt; 
????????????????????????FlinkKafkaConsumer????????????KafkaConsumer??????????flink????kafka????????????????&amp;nbsp;
&gt;&gt;
&gt;&gt;
&gt;&gt; ------------------&amp;nbsp;????????&amp;nbsp;------------------
&gt;&gt; 
??????:&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 
"user-zh"&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <acqua....@gmail.com&amp;gt;;
&gt;&gt; ????????:&amp;nbsp;2020??9??3??(??????) ????6:09
&gt;&gt; ??????:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;&gt;
&gt;&gt; ????:&amp;nbsp;Re: FlinkKafkaConsumer????
&gt;&gt;
&gt;&gt;
&gt;&gt;
&gt;&gt; Hi op,
&gt;&gt;
&gt;&gt; ?? Flink ???? Kafka ?????????? ?? FlinkKafkaConsumer ???? Kafka 
?????????? topic ?????? partition ?????????????????????????????? group id 
?????????????? partition ?????? offset commit ?? 
Kafka???????????????????????????? KafkaConsumer ?????????????????? Kafka 
????????????, ???? Kafka ??????????????????
&gt;&gt;
&gt;&gt; ???????????????????????????? topic ?? group id ???? kafka?? 
?????????????????????????? group id commit offset ??kafka?? ?????? group offset 
?????????????????? ?????????????? commit ?? offset ??????????
&gt;&gt;
&gt;&gt; &amp;gt; ?? 2020??9??3????????3:03??op <520075...@qq.com&amp;gt; ??????
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; hi,&amp;amp;nbsp; 
&amp;amp;nbsp; ????FlinkKafkaConsumer????????????????&amp;amp;nbsp; 
&amp;amp;nbsp; ????????????????????????
&gt;&gt; &amp;gt; //---------------------------
&gt;&gt; &amp;gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
&gt;&gt; &amp;gt; Env.setRestartStrategy(RestartStrategies.noRestart())
&gt;&gt; &amp;gt; val consumerProps = new Properties()
&gt;&gt; &amp;gt; consumerProps.put("bootstrap.servers", brokers)
&gt;&gt; &amp;gt; consumerProps.put("group.id", "test1234")
&gt;&gt; &amp;gt;
&gt;&gt; &amp;gt; val consumer = new FlinkKafkaConsumer[String](topic,new 
KafkaStringSchema,consumerProps).setStartFromLatest()
&gt;&gt; &amp;gt; Env.addSource(consumer).print()
&gt;&gt; &amp;gt; 
Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka
 ??consumer 
group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11
 ????

回复