Flink 1.3.2

I have 1 vm for the job manager and another for task manager.

I have a custom windowing trigger shown below.

My checkpoint data is not clearing.

I have tried to inject a fileStateThresholdSize when instantiating the
FsStateBackend object, but that didn't work.

I have tried explicitly setting state.checkpoints.num-retained: 1 in the
flink.yaml file but that didn't work either.

Not sure what else to try, can someone suggest anything.

Thanks in advance.

Ryan

======================================================


/**
 *
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#built-in-and-custom-triggers
 *
 */
public class ThresholdTrigger extends Trigger<MonitorProbe, TimeWindow> {

  private static final Logger LOG =
LoggerFactory.getLogger(ThresholdTrigger.class);
  private static final long serialVersionUID = 1L;
  private static final SimpleDateFormat sdf = new SimpleDateFormat("dd
HH:mm:ss a");

  private final ValueStateDescriptor<Integer> maxCountDesc =
      new ValueStateDescriptor<>(
          "max",
          TypeInformation.of(new TypeHint<Integer>() {}));

  private final ReducingStateDescriptor<Integer> currentCountDesc =
      new ReducingStateDescriptor<>(
          "count",
          new Sum(),
          IntSerializer.INSTANCE);

  @Override
  public TriggerResult onElement(MonitorProbe probe, long timestamp,
TimeWindow window, TriggerContext ctx)
      throws Exception {

    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
      // if the watermark is already past the window fire immediately
      clear(window, ctx);
      return TriggerResult.FIRE_AND_PURGE;
    }

    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);
    ReducingState<Integer> currentCount =
ctx.getPartitionedState(currentCountDesc);
    currentCount.add(1);

    if (maxCount.value() == null) {
      maxCount.update(probe.getThresholdConfig().getSampleSize());
    }

    LOG.info("{} Window: {} - {} ({} - {}), Total Sample Size: [{}/{}]",
        probe.getLoggingKey(),
        window.getStart(), window.getEnd(),
        sdf.format(new Date(window.getStart())),
        sdf.format(new Date(window.getEnd())),
        currentCount.get(), maxCount.value());

    if (currentCount.get().equals(maxCount.value())){
      clear(window, ctx);
      return TriggerResult.FIRE_AND_PURGE;
    }else{
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
    }

  }

  @Override
  public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx)
      throws Exception {
    throw new UnsupportedOperationException("This is not processing time
trigger");
  }

  @Override
  public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {

    ReducingState<Integer> currentCount =
ctx.getPartitionedState(currentCountDesc);
    ValueState<Integer> maxCount = ctx.getPartitionedState(maxCountDesc);

    if (currentCount.get().equals(maxCount.value())){
      clear(window, ctx);
      return TriggerResult.FIRE_AND_PURGE;
    }else{
      clear(window, ctx);
      return TriggerResult.PURGE;
    }
  }

  @Override
  public void clear(TimeWindow window, TriggerContext ctx) throws Exception
{
    ctx.getPartitionedState(currentCountDesc).clear();
    ctx.getPartitionedState(maxCountDesc).clear();
  }

  @Override
  public String toString() {
    return "ThresholdTrigger(" + maxCountDesc + ")";
  }

  private static class Sum implements ReduceFunction<Integer> {

    private static final long serialVersionUID = 1L;

    @Override
    public Integer reduce(Integer value1, Integer value2) throws Exception {
      return value1 + value2;
    }

  }
}




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to