Hi 苟刚:
另外,看了你的计算逻辑,是同一份数据源里面,有不同类型的数据需要分开处理,你可以尝试使用Flink的侧输出来做分流处理,这样逻辑更清晰,而且程序效率也会高许多
| |
ss
|
|
[email protected]
|
签名由网易邮箱大师定制
在2020年4月7日 19:54,LakeShen<[email protected]> 写道:
Hi 苟刚,
Flink 任务中,如果开启 Checkpoint 的话,会在每次Checkpoint
完成后,提交偏移量。如果没有开启的话,就是根据自动提交来提交偏移量,默认是开启的,间隔是 5 s.
至于你说每次都是重头开始的,我个人的想法是不是在代码中设置了从最早开始消费,也就是 你使用到了这个方法:setStartFromEarliest[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
Best,
LakeShen
gang.gou <[email protected]> 于2020年4月7日周二 下午4:17写道:
好的,我试一下,有结果了同步大家,谢谢!
在 2020/4/7 下午3:52,“Evan”<user-zh-return-2826-gougang_1991=
[email protected] 代表 [email protected]> 写入:
之前的代码好像乱码了,我设置了一下,重新发一下,建议你
在获取consumer之后,再设置一下 consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6
/**
* @param env
* @param topic
* @param time 订阅的时间
* @return
* @throws IllegalAccessException
*/
public static DataStreamSource<XlogStreamBasicBean>
buildSource(StreamExecutionEnvironment env, String topic, Long time) throws
IllegalAccessException {
ParameterTool parameterTool =
(ParameterTool) env.getConfig().getGlobalJobParameters();
Properties props =
buildKafkaProps(parameterTool);
FlinkKafkaConsumer011<XlogStreamBasicBean> consumer = new
FlinkKafkaConsumer011<>(
topic,
new
MetricSchema(),
props);
consumer.setStartFromLatest();
consumer.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<XlogStreamBasicBean>() {
@Override
public long
extractAscendingTimestamp(XlogStreamBasicBean element) {
if (element ==
null || element.getTimestamp() == null) {
return System.currentTimeMillis();
}
return
element.getTimestamp() - 10000;
}
});
return env.addSource(consumer);
}
}
------------------ 原始邮件 ------------------
发件人: "苟刚"<[email protected]>;
发送时间: 2020年4月7日(星期二) 中午11:27
收件人: "user-zh"<[email protected]>;
主题: fink新增计算逻辑时kafka从头开始追平消费记录
Hello,
我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
flink版本:1.6.3
部分代码如下:
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool =
ExecutionEnvUtil.createParameterTool(args);
StreamExecutionEnvironment env =
ExecutionEnvUtil.prepare(parameterTool);
DataStreamSource<XlogStreamBasicBean> data =
KafkaTools.buildSource(env);
// 处理timing数据
processTimingData(parameterTool, data);
// 处理front error数据
processFrontErrorData(parameterTool, data);
// 处理img error数据
processImgLoadErrorData(parameterTool, data);
env.execute("xlog compute");
}
kafka的连接参数配置:
public static Properties buildKafkaProps(ParameterTool parameterTool) {
Properties props = parameterTool.getProperties();
props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
DEFAULT_KAFKA_BROKERS));
props.put("zookeeper.connect",
parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
DEFAULT_KAFKA_GROUP_ID));
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
return props;
}
--
Best Wishes
Galen.K