为了保证exactly-once,flink自己通过barrier来实现checkpoint,包括barrier的传递等等,所以flink在kafkaconsumer的基础之上,封装了一层语义保障。

在 2020/9/4 10:34, Shuiqiang Chen 写道:

Hi,
为了保证 Flink 程序的 exactly-once,必须由各个 Kafka source  算子维护当前算子所消费的 partition 消费 
offset 信息,并在每次checkpoint 时将这些信息写入到 state 中, 在从 checkpoint 恢复中从上次 commit 
的位点开始消费,保证 exactly-once.  如果用 Kafka 消费组管理,那么 FlinkKafkaConsumer 内各个并发实例所分配的 
partition 将由 Kafka 的消费组管理,且 offset 也由 Kafka 消费组管理者记录,Flink 无法维护这些信息。

在 2020年9月4日,上午10:25,op <[email protected]> 写道:

谢谢,但我还是不太明白,FlinkKafkaConsumer底层不是用的KafkaConsumer吗,为什么flink不用kafka的消费组管理呢?&nbsp;


------------------&nbsp;原始邮件&nbsp;------------------
发件人:                                                                                             
                           "user-zh"                                                   
                                 <[email protected]&gt;;
发送时间:&nbsp;2020年9月3日(星期四) 晚上6:09
收件人:&nbsp;"user-zh"<[email protected]&gt;;

主题:&nbsp;Re: FlinkKafkaConsumer问题



Hi op,

在 Flink 消费 Kafka 的过程中, 由 FlinkKafkaConsumer 会从 Kafka 中拿到当前 topic 的所有 partition 
信息并分配给个并发消费,这里的 group id 只是用于将当前 partition 的消费 offset commit 到 
Kafka,并用这个消费组标识。而使用 KafkaConsumer 消费数据则应用到了 Kafka 的消费组管理, 这是 Kafka 服务端的一个角色。

另外,当启动两个作业用同一个 topic 和 group id 消费 kafka, 如果两个作业会分别以同一个 group id commit offset 
到kafka, 如果以 group offset 消费模式启动作业, 则会以最后一次 commit 的 offset 开始消费。

&gt; 在 2020年9月3日,下午3:03,op <[email protected]&gt; 写道:
&gt;
&gt; &amp;nbsp; &amp;nbsp; hi,&amp;nbsp; &amp;nbsp; 
我对FlinkKafkaConsumer的实现有点迷惑,&amp;nbsp; &amp;nbsp; 这有两个相同代码的程序:
&gt; //---------------------------
&gt; val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
&gt; Env.setRestartStrategy(RestartStrategies.noRestart())
&gt; val consumerProps = new Properties()
&gt; consumerProps.put("bootstrap.servers", brokers)
&gt; consumerProps.put("group.id", "test1234")
&gt;
&gt; val consumer = new FlinkKafkaConsumer[String](topic,new 
KafkaStringSchema,consumerProps).setStartFromLatest()
&gt; Env.addSource(consumer).print()
&gt; 
Env.execute()//-----------------------------------我同时启动这两个程序,他们连接相同的集群的topic,group.id也一样,然后我向topic发送一些数据,发现这两个程序都能消费到发送的所有分区的消息,kafka
 的consumer 
group组内应该是有消费隔离的,为什么这里两个程序都能同时消费到全部数据呢?而用KafkaConsumer写两个相同的程序去消费这个topic就可以看到两边程序是没有重复消费同一分区的。我用的是flink1.11flink-connector-kafka_2.11
 谢谢

回复