offset 是最后消费的位置,latest 针对的是kafka topic,对应是的 latest 的消息。 savepoint 内部就是 从offset开始读的。
[email protected] 发件人: 苟刚 发送时间: 2020-04-07 12:03 收件人: user-zh 主题: Re:Re:fink新增计算逻辑时kafka从头开始追平消费记录 latest 不是最后消费的位置吗? 另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。 -- Best Wishes Galen.K 在 2020-04-07 11:39:03,"sunfulin" <[email protected]> 写道: > > > >Hi, >props.put("auto.offset.reset", "latest"); >是加了这个设置导致的吧 > > > > > > > > > > > > > > >在 2020-04-07 11:27:53,"苟刚" <[email protected]> 写道: >>Hello, >> >> 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗? >> 我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。 >> >> >>flink版本:1.6.3 >> >>部分代码如下: >> >>public static void main(String[] args) throws Exception { >>final ParameterTool parameterTool = >>ExecutionEnvUtil.createParameterTool(args); >> StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool); >> >> DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env); >>// 处理timing数据 >>processTimingData(parameterTool, data); >>// 处理front error数据 >>processFrontErrorData(parameterTool, data); >>// 处理img error数据 >>processImgLoadErrorData(parameterTool, data); >> env.execute("xlog compute"); >>} >> >> >> >> >>kafka的连接参数配置: >>public static Properties buildKafkaProps(ParameterTool parameterTool) { >> Properties props = parameterTool.getProperties(); >> props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, >> DEFAULT_KAFKA_BROKERS)); >> props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, >> DEFAULT_KAFKA_ZOOKEEPER_CONNECT)); >> props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, >> DEFAULT_KAFKA_GROUP_ID)); >> props.put("key.deserializer", >> "org.apache.kafka.common.serialization.StringDeserializer"); >> props.put("value.deserializer", >> "org.apache.kafka.common.serialization.StringDeserializer"); >> props.put("auto.offset.reset", "latest"); >>return props; >>} >> >> >> >> >> >> >> >>-- >> >>Best Wishes >> Galen.K >>
