I am reading stats from Kinesis, deserializing them into a stat POJO and
then doing something like this using an aggregated window with no defined
processWindow function:

        timestampedStats
                .keyBy(v -> v.groupKey())

.window(TumblingEventTimeWindows.of(Time.seconds(appCfg.getWindowSize())))
                .aggregate(new ImpactAggregator(appCfg.getSmoothingRange(),
appCfg.getThreshold()))

.sinkTo(getKinesisProducer(appCfg.getAcuWindowsStreamName(),
appCfg.getAwsRegion()))
                .name("Kinesis Producer");

As part of the aggregation logic, I am looking for certain threshold
violations where some field in each metric is tested against some fixed
threshold. I then increment a counter in an accumulator for each stat
violation for the duration of the window (300 seconds) along with some
other metadata associated with that stat that violated the threshold. If
there are violations, then I want to process the window by serializing its
contents to JSON and publishing to Kinesis. What I want to do is NOT
serialize a window that has NO violations in its accumulator. There is no
need to send a message when no bad conditions are observed.

   - Could you please tell me how I can just throw away a window and NOT
   serialize it when it is ready to be processed?
   - How do I hook into some event that allows me to do that?
   - Do I need to implement a ProcessKeyedWindowFunction along with my
   AggregateFunction and somehow handle this as part of the process window
   function?

I have created a class that implements SerializationSchema to do that
serialization but the serialize() function requires a valid JSON returned
byte[]. I think the solution is somewhere else where I can elect to NOT
process the window at all and thereby serialize() will NOT get called.

Thank you

Reply via email to