我的kafka版本是0.11;flink版本是1.6.3;我没有显示设置offset自动提交,但是看kafka的官网文档,默认应该是true




在 2020/4/7 
下午2:10,“Evan”<[email protected] 代表 
[email protected]> 写入:

    苟刚你好:请问你使用的kafka是什么版本的,还有就是有没有设置offset自动提交
    
    
    
    
    
    
    ------------------&nbsp;原始邮件&nbsp;------------------
    发件人:&nbsp;"苟刚"<[email protected]&gt;;
    发送时间:&nbsp;2020年4月7日(星期二) 中午11:27
    收件人:&nbsp;"user-zh"<[email protected]&gt;;
    
    主题:&nbsp;fink新增计算逻辑时kafka从头开始追平消费记录
    
    
    
    Hello,
    &nbsp;&nbsp;&nbsp; 
我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
    &nbsp;&nbsp;&nbsp; 
我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
    
    
    flink版本:1.6.3
    
    部分代码如下:
    
    public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = 
ExecutionEnvUtil.createParameterTool(args);
    &nbsp;&nbsp;&nbsp; StreamExecutionEnvironment env = 
ExecutionEnvUtil.prepare(parameterTool);
    
    &nbsp;&nbsp;&nbsp; DataStreamSource<XlogStreamBasicBean&gt; data = 
KafkaTools.buildSource(env);
    // 处理timing数据
    processTimingData(parameterTool, data);
    // 处理front error数据
    processFrontErrorData(parameterTool, data);
    // 处理img error数据
    processImgLoadErrorData(parameterTool, data);
    &nbsp;&nbsp;&nbsp; env.execute("xlog compute");
    }
    
    
    
    
    kafka的连接参数配置:
    public static Properties buildKafkaProps(ParameterTool parameterTool) {
    &nbsp;&nbsp;&nbsp; Properties props = parameterTool.getProperties();
    &nbsp;&nbsp;&nbsp; props.put("bootstrap.servers", 
parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
    &nbsp;&nbsp;&nbsp; props.put("zookeeper.connect", 
parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
    &nbsp;&nbsp;&nbsp; props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
DEFAULT_KAFKA_GROUP_ID));
    &nbsp;&nbsp;&nbsp; props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    &nbsp;&nbsp;&nbsp; props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    &nbsp;&nbsp;&nbsp; props.put("auto.offset.reset", "latest");
    return props;
    }
    
    
    
    
    
    
    
    --
    
    Best Wishes
    &nbsp;&nbsp; Galen.K

回复