苟刚你好,刚才看了你的kafka消费代码,建议你在获取consumer后,增加一行如下代码
“consumer.setStartFromLatest();”然后再测试一下。
/**
* @param env
* @param topic
* @param time 订阅的时间
* @return
* @throws IllegalAccessException
*/
public static DataStreamSource<XlogStreamBasicBean>
buildSource(StreamExecutionEnvironment env, String topic, Long time) throws
IllegalAccessException {
ParameterTool parameterTool = (ParameterTool)
env.getConfig().getGlobalJobParameters();
Properties props = buildKafkaProps(parameterTool);
FlinkKafkaConsumer011<XlogStreamBasicBean>
consumer = new FlinkKafkaConsumer011<>(
topic,
new MetricSchema(),
props);
consumer.setStartFromLatest();
consumer.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<XlogStreamBasicBean>() {
@Override
public long
extractAscendingTimestamp(XlogStreamBasicBean element) {
if (element == null ||
element.getTimestamp() == null) {
return
System.currentTimeMillis();
}
return
element.getTimestamp() - 10000;
}
});
return env.addSource(consumer);
}
}
------------------ 原始邮件 ------------------
发件人: "gang.gou"<[email protected]>;
发送时间: 2020年4月7日(星期二) 下午2:32
收件人: "user-zh"<[email protected]>;
主题: Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录
我的kafka版本是0.11;flink版本是1.6.3;我没有显示设置offset自动提交,但是看kafka的官网文档,默认应该是true
在 2020/4/7
下午2:10,“Evan”<[email protected] 代表
[email protected]> 写入:
苟刚你好:请问你使用的kafka是什么版本的,还有就是有没有设置offset自动提交
------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"苟刚"<[email protected]&gt;;
发送时间:&nbsp;2020年4月7日(星期二) 中午11:27
收件人:&nbsp;"user-zh"<[email protected]&gt;;
主题:&nbsp;fink新增计算逻辑时kafka从头开始追平消费记录
Hello,
&nbsp;&nbsp;&nbsp;
我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
&nbsp;&nbsp;&nbsp;
我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
flink版本:1.6.3
部分代码如下:
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool =
ExecutionEnvUtil.createParameterTool(args);
&nbsp;&nbsp;&nbsp; StreamExecutionEnvironment
env = ExecutionEnvUtil.prepare(parameterTool);
&nbsp;&nbsp;&nbsp;
DataStreamSource<XlogStreamBasicBean&gt; data = KafkaTools.buildSource(env);
// 处理timing数据
processTimingData(parameterTool, data);
// 处理front error数据
processFrontErrorData(parameterTool, data);
// 处理img error数据
processImgLoadErrorData(parameterTool, data);
&nbsp;&nbsp;&nbsp; env.execute("xlog compute");
}
kafka的连接参数配置:
public static Properties buildKafkaProps(ParameterTool
parameterTool) {
&nbsp;&nbsp;&nbsp; Properties props =
parameterTool.getProperties();
&nbsp;&nbsp;&nbsp;
props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
DEFAULT_KAFKA_BROKERS));
&nbsp;&nbsp;&nbsp;
props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
&nbsp;&nbsp;&nbsp; props.put("group.id",
parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
&nbsp;&nbsp;&nbsp; props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
&nbsp;&nbsp;&nbsp;
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
&nbsp;&nbsp;&nbsp;
props.put("auto.offset.reset", "latest");
return props;
}
--
Best Wishes
&nbsp;&nbsp; Galen.K