Hi all
小弟遇到个问题期望大佬解答解答:
通过 env.setStateBackend(new
RocksDBStateBackend("file:///data/flink/checkpoints"));设置状态存储位置,job运行起来后找不到状态数据,
flink1.12 yarn pre job 模式,下面是我的配置,job运行起来后在服务器上找不到
“/data/flink/checkpoints”这个目录,像我设置了状态的存储位置是不是job一运行起来对应的存储位置就应该有状态的数据呢?
public class FlinkTestDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.getConfig().setAutoWatermarkInterval(200);
env.setStateBackend(new RocksDBStateBackend("file:///data/flink/checkpoints"));
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env,
bsSettings);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
bsTableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMinutes(5));
Configuration configuration = bsTableEnv.getConfig().getConfiguration();
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "6000");
configuration.setString("table.exec.mini-batch.size", "5000");
| |
刘海
|
|
[email protected]
|
签名由网易邮箱大师定制