Hi, I just tried out checkpoint with FsStateBackend in 1.3.2 and everything works as expected for me. Can you give a bit more detail what you mean by „checkpoint data is not cleaning“? For example, is it not cleaned up while the job is running and accumulating „chk-[ID]“ directories or is something left over multiple restarts? Which filesystem are you using for the checkpoints, e.g. local, HDFS, S3,… ? Does this also happen for other jobs?
Best, Stefan > Am 08.09.2017 um 03:17 schrieb rnosworthy <[email protected]>: > > Flink 1.3.2 > > I have 1 vm for the job manager and another for task manager. > > I have a custom windowing trigger shown below. > > My checkpoint data is not clearing. > > I have tried to inject a fileStateThresholdSize when instantiating the > FsStateBackend object, but that didn't work. > > I have tried explicitly setting state.checkpoints.num-retained: 1 in the > flink.yaml file but that didn't work either. > > Not sure what else to try, can someone suggest anything. > > Thanks in advance. > > Ryan > > ====================================================== > > > /** > * > https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers > * > */ > public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> { > > private static final Logger LOG = > LoggerFactory.getLogger(ThresholdTrigger.class); > private static final long serialVersionUID = 1L; > private static final SimpleDateFormat sdf = new SimpleDateFormat("dd > HH:mm:ss a"); > > private final ValueStateDescriptor<Integer> maxCountDesc = > new ValueStateDescriptor<>( > "max", > TypeInformation.of(new TypeHint<Integer>() {})); > > private final ReducingStateDescriptor<Integer> currentCountDesc = > new ReducingStateDescriptor<>( > "count", > new Sum(), > IntSerializer.INSTANCE); > > @Override > public TriggerResult onElement(MonitorProbe probe, long timestamp, > TimeWindow window, TriggerContext ctx) > throws Exception { > > if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { > // if the watermark is already past the window fire immediately > clear(window, ctx); > return TriggerResult.FIRE_AND_PURGE; > } > > ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); > ReducingState<Integer> currentCount = > ctx.getPartitionedState(currentCountDesc); > currentCount.add(1); > > if (maxCount.value() == null) { > maxCount.update(probe.getThresholdConfig().getSampleSize()); > } > > LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]", > probe.getLoggingKey(), > window.getStart(), window.getEnd(), > sdf.format(new Date(window.getStart())), > sdf.format(new Date(window.getEnd())), > currentCount.get(), maxCount.value()); > > if (currentCount.get().equals(maxCount.value())){ > clear(window, ctx); > return TriggerResult.FIRE_AND_PURGE; > }else{ > ctx.registerEventTimeTimer(window.maxTimestamp()); > return TriggerResult.CONTINUE; > } > > } > > @Override > public TriggerResult onProcessingTime(long time, TimeWindow window, > TriggerContext ctx) > throws Exception { > throw new UnsupportedOperationException("This is not processing time > trigger"); > } > > @Override > public TriggerResult onEventTime(long time, TimeWindow window, > TriggerContext ctx) throws Exception { > > ReducingState<Integer> currentCount = > ctx.getPartitionedState(currentCountDesc); > ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); > > if (currentCount.get().equals(maxCount.value())){ > clear(window, ctx); > return TriggerResult.FIRE_AND_PURGE; > }else{ > clear(window, ctx); > return TriggerResult.PURGE; > } > } > > @Override > public void clear(TimeWindow window, TriggerContext ctx) throws Exception > { > ctx.getPartitionedState(currentCountDesc).clear(); > ctx.getPartitionedState(maxCountDesc).clear(); > } > > @Override > public String toString() { > return "ThresholdTrigger(" + maxCountDesc + ")"; > } > > private static class Sum implements ReduceFunction<Integer> { > > private static final long serialVersionUID = 1L; > > @Override > public Integer reduce(Integer value1, Integer value2) throws Exception { > return value1 + value2; > } > > } > } > > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
