老师你好,首先感谢你在百忙之中回复我。 我这面观察到的现象是,当有数据流入时,每个checkpoint的stage size比上一个checkpoint多几百k左右,只要数据持续流入,这个stage size就一直增长,当没有数据流入时,checkpoint的stage size就维持不变了,再有数据流入时,stage size就在原来基础上继续增长。
数据流: SingleOutputStreamOperator<StudentAggResult> studentSubjectStream = dataStream .filter(new Question2SubjectFilter()) .keyBy(new TaskStudentSubjectKeySelector()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .aggregate(new StudentSubjectScoreAgg()); studentSubjectStream.addSink(getKafkaProducer(KafkaTopic.STUDENT_SUBJECT_AGG.getTopic(), StudentAggResult.class)); 聚合函数: public abstract class BaseAgg<T, R extends IMergeable> implements AggregateFunction<T, R, R> { public abstract R create(T input); public abstract void merge(R aggResult, T t); @Override public R createAccumulator() { return null; } @Override public R add(T t, R aggResult) { if (aggResult == null) { aggResult = create(t); } merge(aggResult, t); return aggResult; } @Override public R getResult(R aggResult) { return aggResult; } @Override public R merge(R aggResult, R acc1) { if (acc1 == null) { return aggResult; } if (aggResult == null) { return acc1; } aggResult.merge(acc1); return aggResult; } } checkpoint配置: env.enableCheckpointing(5000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 状态存储通过rocksdb。 -- Sent from: http://apache-flink.147419.n8.nabble.com/