Hi 苟刚, Flink 任务中,如果开启 Checkpoint 的话,会在每次Checkpoint 完成后,提交偏移量。如果没有开启的话,就是根据自动提交来提交偏移量,默认是开启的,间隔是 5 s. 至于你说每次都是重头开始的,我个人的想法是不是在代码中设置了从最早开始消费,也就是 你使用到了这个方法:setStartFromEarliest[1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration Best, LakeShen gang.gou <[email protected]> 于2020年4月7日周二 下午4:17写道: > 好的,我试一下,有结果了同步大家,谢谢! > > 在 2020/4/7 下午3:52,“Evan”<user-zh-return-2826-gougang_1991= > [email protected] 代表 [email protected]> 写入: > > 之前的代码好像乱码了,我设置了一下,重新发一下,建议你 > 在获取consumer之后,再设置一下 consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka > Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6 > > > > /** > * @param env > * @param topic > * @param time 订阅的时间 > * @return > * @throws IllegalAccessException > */ > public static DataStreamSource<XlogStreamBasicBean> > buildSource(StreamExecutionEnvironment env, String topic, Long time) throws > IllegalAccessException { > ParameterTool parameterTool = > (ParameterTool) env.getConfig().getGlobalJobParameters(); > Properties props = > buildKafkaProps(parameterTool); > > FlinkKafkaConsumer011<XlogStreamBasicBean> consumer = new > FlinkKafkaConsumer011<>( > topic, > new > MetricSchema(), > props); > > > consumer.setStartFromLatest(); > > > consumer.assignTimestampsAndWatermarks(new > AscendingTimestampExtractor<XlogStreamBasicBean>() { > @Override > public long > extractAscendingTimestamp(XlogStreamBasicBean element) { > if (element == > null || element.getTimestamp() == null) { > > return System.currentTimeMillis(); > } > return > element.getTimestamp() - 10000; > } > }); > return env.addSource(consumer); > } > > > } > > > > > > ------------------ 原始邮件 ------------------ > 发件人: "苟刚"<[email protected]>; > 发送时间: 2020年4月7日(星期二) 中午11:27 > 收件人: "user-zh"<[email protected]>; > > 主题: fink新增计算逻辑时kafka从头开始追平消费记录 > > > > Hello, > > > 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗? > 我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。 > > > flink版本:1.6.3 > > 部分代码如下: > > public static void main(String[] args) throws Exception { > final ParameterTool parameterTool = > ExecutionEnvUtil.createParameterTool(args); > StreamExecutionEnvironment env = > ExecutionEnvUtil.prepare(parameterTool); > > DataStreamSource<XlogStreamBasicBean> data = > KafkaTools.buildSource(env); > // 处理timing数据 > processTimingData(parameterTool, data); > // 处理front error数据 > processFrontErrorData(parameterTool, data); > // 处理img error数据 > processImgLoadErrorData(parameterTool, data); > env.execute("xlog compute"); > } > > > > > kafka的连接参数配置: > public static Properties buildKafkaProps(ParameterTool parameterTool) { > Properties props = parameterTool.getProperties(); > props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, > DEFAULT_KAFKA_BROKERS)); > props.put("zookeeper.connect", > parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, > DEFAULT_KAFKA_ZOOKEEPER_CONNECT)); > props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, > DEFAULT_KAFKA_GROUP_ID)); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("auto.offset.reset", "latest"); > return props; > } > > > > > > > > -- > > Best Wishes > Galen.K > > >
