Hi All

This My Coding:

statDataStream
  .map(new InnerStatMap(logType))
    .uid("InnerStatMap").name("InnerStatMap")
  .keyBy(new InnerKeySelector)
  .timeWindow(Time.seconds(statTimeWindow))
  .reduce(new InnerStatReduce)
    .uid("InnerReduce").name("InnerReduce")
  .addSink(innerStatProducer)
    .uid("InnerSink").name("InnerSink")
  .setParallelism(sinkParallelism)


I find checkpoint state is becoming increasingly over time

best wishes

回复