latest 不是最后消费的位置吗?
另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。
--
Best Wishes
Galen.K
在 2020-04-07 11:39:03,"sunfulin" <[email protected]> 写道:
>
>
>
>Hi,
>props.put("auto.offset.reset", "latest");
>是加了这个设置导致的吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-07 11:27:53,"苟刚" <[email protected]> 写道:
>>Hello,
>>
>> 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>> 我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>>
>>
>>flink版本:1.6.3
>>
>>部分代码如下:
>>
>>public static void main(String[] args) throws Exception {
>>final ParameterTool parameterTool =
>>ExecutionEnvUtil.createParameterTool(args);
>> StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
>>
>> DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);
>>// 处理timing数据
>>processTimingData(parameterTool, data);
>>// 处理front error数据
>>processFrontErrorData(parameterTool, data);
>>// 处理img error数据
>>processImgLoadErrorData(parameterTool, data);
>> env.execute("xlog compute");
>>}
>>
>>
>>
>>
>>kafka的连接参数配置:
>>public static Properties buildKafkaProps(ParameterTool parameterTool) {
>> Properties props = parameterTool.getProperties();
>> props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
>> DEFAULT_KAFKA_BROKERS));
>> props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
>> DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>> props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
>> DEFAULT_KAFKA_GROUP_ID));
>> props.put("key.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>> props.put("value.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>> props.put("auto.offset.reset", "latest");
>>return props;
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best Wishes
>> Galen.K
>>