老师你好,首先感谢你在百忙之中回复我。
我这面观察到的现象是,当有数据流入时,每个checkpoint的stage
size比上一个checkpoint多几百k左右,只要数据持续流入,这个stage
size就一直增长,当没有数据流入时,checkpoint的stage size就维持不变了,再有数据流入时,stage
size就在原来基础上继续增长。

数据流:
SingleOutputStreamOperator<StudentAggResult> studentSubjectStream =
dataStream
                .filter(new Question2SubjectFilter())
                .keyBy(new TaskStudentSubjectKeySelector())
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new StudentSubjectScoreAgg());
       
studentSubjectStream.addSink(getKafkaProducer(KafkaTopic.STUDENT_SUBJECT_AGG.getTopic(),
StudentAggResult.class));

聚合函数:
public abstract class BaseAgg<T, R extends IMergeable> implements
AggregateFunction<T, R, R> {

    public abstract R create(T input);
    public abstract void merge(R aggResult, T t);

    @Override
    public R createAccumulator() {
        return null;
    }

    @Override
    public R add(T t, R aggResult) {
        if (aggResult == null) {
            aggResult = create(t);
        }
        merge(aggResult, t);
        return aggResult;
    }

    @Override
    public R getResult(R aggResult) {
        return aggResult;
    }

    @Override
    public R merge(R aggResult, R acc1) {
        if (acc1 == null) {
            return aggResult;
        }
        if (aggResult == null) {
            return acc1;
        }
        aggResult.merge(acc1);
        return aggResult;
    }
}

checkpoint配置:
    env.enableCheckpointing(5000);
   
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    env.getCheckpointConfig().setCheckpointTimeout(10 * 60 * 1000);            
   env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

状态存储通过rocksdb。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复