Hello,
    
我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
    我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。


flink版本:1.6.3

部分代码如下:

public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);

    DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);
// 处理timing数据
processTimingData(parameterTool, data);
// 处理front error数据
processFrontErrorData(parameterTool, data);
// 处理img error数据
processImgLoadErrorData(parameterTool, data);
    env.execute("xlog compute");
}




kafka的连接参数配置:
public static Properties buildKafkaProps(ParameterTool parameterTool) {
    Properties props = parameterTool.getProperties();
    props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, 
DEFAULT_KAFKA_BROKERS));
    props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, 
DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
    props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, 
DEFAULT_KAFKA_GROUP_ID));
    props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
    props.put("auto.offset.reset", "latest");
return props;
}







--

Best Wishes
   Galen.K

回复