???????? ????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??9??4??(??????) ????11:54
??????: "user-zh"<[email protected]>;"Shuiqiang
Chen"<[email protected]>;
????: Re: FlinkKafkaConsumer????
????????exactly-once??flink????????barrier??????checkpoint??????barrier????????????????flink??kafkaconsumer????????????????????????????????
?? 2020/9/4 10:34, Shuiqiang Chen ????:
> Hi??
> ???????? Flink ?????? exactly-once???????????? Kafka source
???????????????????????? partition ???? offset ??????????????checkpoint
?????????????????? state ???? ???? checkpoint ???????????? commit
???????????????????? exactly-once. ?????? Kafka ????????????????
FlinkKafkaConsumer ?????????????????????? partition ???? Kafka ????????????????
offset ???? Kafka ??????????????????Flink ??????????????????
>
>> ?? 2020??9??4????????10:25??op <[email protected]> ??????
>>
>>
????????????????????????FlinkKafkaConsumer????????????KafkaConsumer??????????flink????kafka????????????????&nbsp;
>>
>>
>> ------------------&nbsp;????????&nbsp;------------------
>>
??????:
"user-zh"
<[email protected]&gt;;
>> ????????:&nbsp;2020??9??3??(??????) ????6:09
>> ??????:&nbsp;"user-zh"<[email protected]&gt;;
>>
>> ????:&nbsp;Re: FlinkKafkaConsumer????
>>
>>
>>
>> Hi op,
>>
>> ?? Flink ???? Kafka ?????????? ?? FlinkKafkaConsumer ???? Kafka
?????????? topic ?????? partition ?????????????????????????????? group id
?????????????? partition ?????? offset commit ??
Kafka???????????????????????????? KafkaConsumer ?????????????????? Kafka
????????????, ???? Kafka ??????????????????
>>
>> ???????????????????????????? topic ?? group id ???? kafka??
?????????????????????????? group id commit offset ??kafka?? ?????? group offset
?????????????????? ?????????????? commit ?? offset ??????????
>>
>> &gt; ?? 2020??9??3????????3:03??op <[email protected]&gt; ??????
>> &gt;
>> &gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp;
&amp;nbsp; ????FlinkKafkaConsumer????????????????&amp;nbsp;
&amp;nbsp; ????????????????????????
>> &gt; //---------------------------
>> &gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> &gt; Env.setRestartStrategy(RestartStrategies.noRestart())
>> &gt; val consumerProps = new Properties()
>> &gt; consumerProps.put("bootstrap.servers", brokers)
>> &gt; consumerProps.put("group.id", "test1234")
>> &gt;
>> &gt; val consumer = new FlinkKafkaConsumer[String](topic,new
KafkaStringSchema,consumerProps).setStartFromLatest()
>> &gt; Env.addSource(consumer).print()
>> &gt;
Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka
??consumer
group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11
????