public class ExecutionEnvUtil {
public static ParameterTool createParameterTool(final String[] args) throws
Exception {
return ParameterTool
.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(PropertiesConstants.PROPERTIES_FILE_NAME))
.mergeWith(ParameterTool.fromArgs(args))
.mergeWith(ParameterTool.fromSystemProperties());
}
private static ParameterTool createParameterTool() {
try {
return ParameterTool
.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(PropertiesConstants.PROPERTIES_FILE_NAME))
.mergeWith(ParameterTool.fromSystemProperties());
} catch (IOException e) {
e.printStackTrace();
}
return ParameterTool.fromSystemProperties();
}
public static StreamExecutionEnvironment prepare(ParameterTool parameterTool)
throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
60000));
env.getConfig().setGlobalJobParameters(parameterTool);
env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
return env;
}
}
--
Best Wishes
Galen.K
在 2020-04-07 12:56:37,"酷酷的浑蛋" <[email protected]> 写道:
>ExecutionEnvUtil
>这个没有
>
>
>| |
>apache22
>|
>|
>[email protected]
>|
>签名由网易邮箱大师定制
>在2020年4月7日 12:23,苟刚<[email protected]> 写道:
>
>附件是两份主要代码
>
>
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best Wishes
> Galen.K
>
>
>
>
>
>在 2020-04-07 12:11:07,"酷酷的浑蛋" <[email protected]> 写道:
>>是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看
>>
>>
>>| |
>>apache22
>>|
>>|
>>[email protected]
>>|
>>签名由网易邮箱大师定制
>>在2020年4月7日 12:03,苟刚<[email protected]> 写道:
>>
>>
>>
>>latest 不是最后消费的位置吗?
>>另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best Wishes
>>Galen.K
>>
>>
>>
>>
>>
>>在 2020-04-07 11:39:03,"sunfulin" <[email protected]> 写道:
>>
>>
>>
>>Hi,
>>props.put("auto.offset.reset", "latest");
>>是加了这个设置导致的吧
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2020-04-07 11:27:53,"苟刚" <[email protected]> 写道:
>>Hello,
>>我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>>我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>>
>>
>>flink版本:1.6.3
>>
>>部分代码如下:
>>
>>public static void main(String[] args) throws Exception {
>>final ParameterTool parameterTool =
>>ExecutionEnvUtil.createParameterTool(args);
>>StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
>>
>>DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);
>>// 处理timing数据
>>processTimingData(parameterTool, data);
>>// 处理front error数据
>>processFrontErrorData(parameterTool, data);
>>// 处理img error数据
>>processImgLoadErrorData(parameterTool, data);
>>env.execute("xlog compute");
>>}
>>
>>
>>
>>
>>kafka的连接参数配置:
>>public static Properties buildKafkaProps(ParameterTool parameterTool) {
>>Properties props = parameterTool.getProperties();
>>props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
>>DEFAULT_KAFKA_BROKERS));
>>props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
>>DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>>props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
>>DEFAULT_KAFKA_GROUP_ID));
>>props.put("key.deserializer",
>>"org.apache.kafka.common.serialization.StringDeserializer");
>>props.put("value.deserializer",
>>"org.apache.kafka.common.serialization.StringDeserializer");
>>props.put("auto.offset.reset", "latest");
>>return props;
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best Wishes
>>Galen.K
>>
>
>
>
>
>
>