Flink 1.3.2 I have 1 vm for the job manager and another for task manager.
I have a custom windowing trigger shown below. My checkpoint data is not clearing. I have tried to inject a fileStateThresholdSize when instantiating the FsStateBackend object, but that didn't work. I have tried explicitly setting state.checkpoints.num-retained: 1 in the flink.yaml file but that didn't work either. Not sure what else to try, can someone suggest anything. Thanks in advance. Ryan ====================================================== /** * https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers * */ public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> { private static final Logger LOG = LoggerFactory.getLogger(ThresholdTrigger.class); private static final long serialVersionUID = 1L; private static final SimpleDateFormat sdf = new SimpleDateFormat("dd HH:mm:ss a"); private final ValueStateDescriptor<Integer> maxCountDesc = new ValueStateDescriptor<>( "max", TypeInformation.of(new TypeHint<Integer>() {})); private final ReducingStateDescriptor<Integer> currentCountDesc = new ReducingStateDescriptor<>( "count", new Sum(), IntSerializer.INSTANCE); @Override public TriggerResult onElement(MonitorProbe probe, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { // if the watermark is already past the window fire immediately clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; } ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); ReducingState<Integer> currentCount = ctx.getPartitionedState(currentCountDesc); currentCount.add(1); if (maxCount.value() == null) { maxCount.update(probe.getThresholdConfig().getSampleSize()); } LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]", probe.getLoggingKey(), window.getStart(), window.getEnd(), sdf.format(new Date(window.getStart())), sdf.format(new Date(window.getEnd())), currentCount.get(), maxCount.value()); if (currentCount.get().equals(maxCount.value())){ clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; }else{ ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.CONTINUE; } } @Override public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { throw new UnsupportedOperationException("This is not processing time trigger"); } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception { ReducingState<Integer> currentCount = ctx.getPartitionedState(currentCountDesc); ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc); if (currentCount.get().equals(maxCount.value())){ clear(window, ctx); return TriggerResult.FIRE_AND_PURGE; }else{ clear(window, ctx); return TriggerResult.PURGE; } } @Override public void clear(TimeWindow window, TriggerContext ctx) throws Exception { ctx.getPartitionedState(currentCountDesc).clear(); ctx.getPartitionedState(maxCountDesc).clear(); } @Override public String toString() { return "ThresholdTrigger(" + maxCountDesc + ")"; } private static class Sum implements ReduceFunction<Integer> { private static final long serialVersionUID = 1L; @Override public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } } } -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/