????????????????????????FlinkKafkaConsumer????????????KafkaConsumer??????????flink????kafka????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??9??3??(??????) ????6:09
??????: "user-zh"<[email protected]>;
????: Re: FlinkKafkaConsumer????
Hi op,
?? Flink ???? Kafka ?????????? ?? FlinkKafkaConsumer ???? Kafka ??????????
topic ?????? partition ?????????????????????????????? group id ??????????????
partition ?????? offset commit ?? Kafka????????????????????????????
KafkaConsumer ?????????????????? Kafka ????????????, ???? Kafka
??????????????????
???????????????????????????? topic ?? group id ???? kafka??
?????????????????????????? group id commit offset ??kafka?? ?????? group offset
?????????????????? ?????????????? commit ?? offset ??????????
> ?? 2020??9??3????????3:03??op <[email protected]> ??????
>
> &nbsp; &nbsp; hi,&nbsp; &nbsp;
????FlinkKafkaConsumer????????????????&nbsp; &nbsp;
????????????????????????
> //---------------------------
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> Env.setRestartStrategy(RestartStrategies.noRestart())
> val consumerProps = new Properties()
> consumerProps.put("bootstrap.servers", brokers)
> consumerProps.put("group.id", "test1234")
>
> val consumer = new FlinkKafkaConsumer[String](topic,new
KafkaStringSchema,consumerProps).setStartFromLatest()
> Env.addSource(consumer).print()
>
Env.execute()//-----------------------------------??????????????????????????????????????????topic??group.id????????????????topic????????????????????????????????????????????????????????????kafka
??consumer
group????????????????????????????????????????????????????????????????????????KafkaConsumer??????????????????????????topic????????????????????????????????????????????????????flink1.11flink-connector-kafka_2.11
????