Hi,
Here is my issue with *Event Processing* with the *add() method of
MGroupingWindowAggregate not being called* even though a new watermark is
fired
1. *Ingest data from Kinesis (works fine)*
2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get json
back)
3. I do *assign MonitoringTSWAssigner*(code below) to the source with bound
of 10(have tried 3000, 30000). *It fires a new WaterMark* with each
incoming record but the *windowStream.aggregate method doesn't seem to fire*
and I
*don't see the add() method of MGroupingWindowAggregatecalled *???? I *can
see the newWaterMark being emitted in
TimestampsAndPunctuatedWatermarksOperator.processElement*
4. I have tried with timeWindow of 1m and 15s

*Main* code:

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);

//Setup Kinesis Consumer
Properties kinesisConsumerConfig = new Properties();
..
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
FlinkKinesisConsumer<>(
                "kinesisTopicRead", new MonitoringMapKinesisSchema(true),
kinesisConsumerConfig);

DataStream<Map<String, Object>> kinesisStream;
RichSinkFunction<InfluxDBPoint> influxSink;

DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
env.addSource(kinesisConsumer);
kinesisStream = monitoringDataStreamSource
        .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound));
influxSink = pms.createInfluxMonitoringSink(....);
......
...timeWindow = Time.seconds(*timeIntervalL*);//tried with
timeIntervalL=15s, 1m

KeyedStream<Map<String, Object>, MonitoringTuple>
monitoringTupleKeyedStream =
        kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
DataStream<InfluxDBPoint> enrichedMGStream = *windowStream.aggregate*(//*<=====
never reaches here ?????*
        *new MGroupingWindowAggregate(interval)*,
        new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
        .map(new MonitoringGroupingToInfluxDBPoint(rule));
enrichedMGStream.addSink(influxSink);
env.execute("Aggregation of Map data");

*MonitoringTSWAssigner* code:
public class MonitoringTSWAssigner implements
AssignerWithPunctuatedWatermarks<Map<String, Object>> {
    private long bound = 5 * (long) 1000;//5 secs out of order bound in
millisecs
    private long maxTimestamp = Long.MIN_VALUE;

    public MonitoringTSWAssigner() {
    }

    public MonitoringTSWAssigner(long bound) {
        this.bound = bound;
    }

    public long extractTimestamp(Map<String, Object> monitoring, long
previousTS) {
        long extractedTS = getExtractedTS(monitoring);
        if (extractedTS > maxTimestamp) {
            maxTimestamp = extractedTS;
        }

   return extractedTS;//return System.currentTimeMillis();

    }

    public long getExtractedTS(Map<String, Object> monitoring) {
        final String eventTimestamp = monitoring.get(Utils.EVENT_TIMESTAMP)
!= null ? (String) monitoring.get(Utils.EVENT_TIMESTAMP) : "";
        return Utils.getLongFromDateStr(eventTimestamp);
    }

    @Override
    public Watermark checkAndGetNextWatermark(Map<String, Object>
monitoring, long extractedTimestamp) {
        long extractedTS = getExtractedTS(monitoring);
        long nextWatermark = maxTimestamp - bound;
        return new Watermark(nextWatermark);
    }
}

*MGroupingWindowAggregate*:
public class MGroupingWindowAggregate implements
*AggregateFunction*<Map<String,
Object>, Map<String, Object>, Map<String, Object>> {
    private final String interval;
    public MGroupingWindowAggregate(String interval) {
        this.interval = interval;
    }
    public Map<String, Object> createAccumulator() {
        return new ConcurrentHashMap<>();
    }

    public Map<String, Object> add(Map<String, Object> monitoring,
Map<String, Object> timedMap) {
.....
}

.....

}

TIA,

Reply via email to