Hi Theo, You were right. For some reason(I still haven't figured it out) but the FilterFunction was causing issues. I commented it out and it started getting into the add() method of the aggregate method.
/*kinesisStream = kinesisStream.filter((FilterFunction<Map<String, Object>>) inputMap -> { Object groupByValueObj = inputMap.get(groupBy); return groupByValueObj != null; });*/ //String metric = Objects.requireNonNull(inputMetricSelector).getMetric(); TIA, Vijay On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan <bvija...@gmail.com> wrote: > Hi Theo, > It gets to the FilterFunction during the creation of the ExecutionGraph > initially but not during the runtime when recs are streaming in.So, it is > not getting that far- seems to be stuck in the > > final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = > kinesisStream.filter code. > > Doesn't seem to get past it as it keeps incrementing watermarks but the > Watermark never seems to hit the end of the window.Maybe I am doing > > something super simple stupid. > > TIA, > Vijay > > On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal < > theo.diefent...@scoop-software.de> wrote: > >> Hi Vijay, >> >> Maybe a stupid question, but according to your comments, the code works >> fine up till a "flatMap" operation. It seems that this flatMap is directly >> followed by a filter-Function in the method >> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out >> all events? Or is not even the filter function itself called? (Due to your >> comments suggesting it). >> >> Best regards >> Theo >> >> ------------------------------ >> *Von: *"Vijay Balakrishnan" <bvija...@gmail.com> >> *An: *"Dawid Wysakowicz" <dwysakow...@apache.org> >> *CC: *"user" <user@flink.apache.org> >> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05 >> *Betreff: *Re: add() method of AggregateFunction not called even though >> new watermark is emitted >> >> Hi, >> Thx for the replies - Congxian & Dawdi. >> Watermarks are advancing.Not sure how to check every new generated >> watermark is reaching end of the window ???? >> >> I did check the Flink UI for the currentInputWatermark and it is >> increasing monotonically. >> >> Narrowed down the problem to not calling the windowStream.aggregate. >> I also *added a checkpoint *to see if it was causing the issue.Didn't >> seem to help. >> Most of the code is reached during the creation of the ExecutionGraph on >> the start of the program. >> >> I generate an incrementing sequence of timestamps(delay of 5000ms between >> each rec) from a Producer to Kinesis and it emits a new watermark as it >> starts receiving the input records. >> My window size is 15s. >> I see a WindowedStream is created with windowAssigner: >> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger >> but the *code never gets into the EventTimeTrigger.onElement() or >> onEventTime() to fire the trigger*. >> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark(). >> I even tried to use ProcessingTime but that also didn't help. >> >> >> //code to create kinesis consumer successfully...... >> for (Rule rule : rules.getRules()) { >> //gets in here fine >> final SingleOutputStreamOperator<Map<String, Object>> >> filteredKinesisStream = kinesisStream.filter(mon -> { >> boolean result; >> String eventName = mon.get(MEASUREMENT) != null ? >> (String) mon.get(MEASUREMENT) : ""; >> InputMetricSelector inputMetricSelector = >> rule.getInputMetricSelector(); >> String measurement = inputMetricSelector != null ? >> inputMetricSelector.getMeasurement() : ""; >> result = eventName.equals(measurement); >> if (result) { >> Map<String, String> inputTags = mon.get(TAGS) != null >> ? (Map<String, String>) mon.get(TAGS) : new HashMap<>(); >> Map<String, String> ruleTags = inputMetricSelector != >> null ? inputMetricSelector.getTags() : new HashMap<>(); >> result = matchTags(inputTags, ruleTags); >> } >> return result;//*<== this is true* >> } >> ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>) >> (input, out) -> { >> out.collect(input);//*<==== runs up till here fine* >> }).returns(new TypeHint<Map<String, Object>>() { >> }); >> //*doesn't do anything beyond this point at runtime* >> DataStream<InfluxDBPoint> enrichedMGStream = >> pms.createAggregatedMonitoringGroupingWindowStream1 >> (filteredKinesisStream, ruleFactory, rule, parallelProcess); >> enrichedMGStream.addSink(influxSink) >> .setParallelism(nbrSinks); >> } >> >> private DataStream<InfluxDBPoint> >> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String, >> Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int >> parallelProcess) { >> DataStream<InfluxDBPoint> enrichedComponentInstanceStream1; >> RuleConfig ruleConfig = rule.getRuleConfig(); >> String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : ""; >> RuleIF ruleImpl = ruleFactory.getRule(ruleType); >> Map<String, Object> ruleProps = ruleConfig != null ? >> ruleConfig.getRuleProps() : new HashMap<>(); >> Object intervalObj = ruleProps.get("rule_eval_window"); >> String timeInterval = intervalObj != null ? (String) intervalObj : ""; >> org.apache.flink.streaming.api.windowing.time.Time timeWindow = >> getTimeWindowFromInterval(timeInterval); >> >> Object windowTypeObj = ruleProps.get("window_type"); >> String windowType = windowTypeObj != null ? (String) windowTypeObj : >> ""; >> >> InputMetricSelector inputMetricSelector = >> rule.getInputMetricSelector(); >> Map<String, String> tags = inputMetricSelector != null ? >> inputMetricSelector.getTags() : new HashMap<>(); >> String groupByObj = tags.get(GROUP_BY); >> String groupBy = groupByObj != null ? groupByObj : ""; >> kinesisStream = kinesisStream.filter((FilterFunction<Map<String, >> Object>>) inputMap -> { >> Object groupByValueObj = inputMap.get(groupBy); >> return groupByValueObj != null; >> }); >> Set<String> groupBySet = new >> HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER))); >> String metric = >> Objects.requireNonNull(inputMetricSelector).getMetric(); >> //till here, it went through fine during creation of ExceutionGraph >> KeyedStream<Map<String, Object>, MonitoringTuple> >> monitoringTupleKeyedStream = >> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, >> metric));*<=== never gets into the MapTupleKeySelector.getKey() - a >> similar class works in another project* >> enrichedComponentInstanceStream1 = >> getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow, >> windowType, timeInterval, ruleImpl, rule, parallelProcess); >> return enrichedComponentInstanceStream1; >> } >> >> private DataStream<InfluxDBPoint> >> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>, >> MonitoringTuple> monitoringTupleKeyedStream, >> >> org.apache.flink.streaming.api.windowing.time.Time timeWindow, String >> windowType, >> String >> interval, >> RuleIF >> ruleImpl, Rule rule, int parallelProcess) { >> long slide = 100; >> final WindowedStream<Map<String, Object>, MonitoringTuple, >> TimeWindow> windowStream = >> windowType.equalsIgnoreCase(SLIDING) ? >> monitoringTupleKeyedStream >> .timeWindow(timeWindow, >> org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) : >> monitoringTupleKeyedStream >> .timeWindow(timeWindow); >> return windowStream.aggregate( >> new MGroupingWindowAggregate(interval),//*<=== never gets >> into add() here* >> new MGroupingAggregateWindowProcessing(interval, ruleImpl, >> rule)) >> .map(new MonitoringGroupingToInfluxDBPoint(rule)); >> >> } >> >> On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <dwysakow...@apache.org> >> wrote: >> >>> Hi Vijay, >>> >>> Could you check if the Watermark for the aggregate operator advances? >>> You should be able to check that in the Flink WebUI. Could it be that the >>> Watermark does not advance for all of the upstream operators? The watermark >>> for a particular operator is a minimum of watermarks received from all of >>> the upstream operators. Therefore if some of them does not produce any, the >>> resulting watermark will not advance. >>> >>> Best, >>> >>> Dawdi >>> On 11/10/2019 21:37, Vijay Balakrishnan wrote: >>> >>> Hi, >>> Here is my issue with *Event Processing* with the *add() method of >>> MGroupingWindowAggregate not being called* even though a new watermark >>> is fired >>> 1. *Ingest data from Kinesis (works fine)* >>> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get >>> json back) >>> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with >>> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with >>> each >>> incoming record but the *windowStream.aggregate method doesn't seem to >>> fire* and I >>> *don't see the add() method of MGroupingWindowAggregate called *???? I *can >>> see the newWaterMark being emitted in >>> TimestampsAndPunctuatedWatermarksOperator.processElement* >>> 4. I have tried with timeWindow of 1m and 15s >>> >>> *Main* code: >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*); >>> >>> //Setup Kinesis Consumer >>> Properties kinesisConsumerConfig = new Properties(); >>> .. >>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, >>> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST >>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new >>> FlinkKinesisConsumer<>( >>> "kinesisTopicRead", new >>> MonitoringMapKinesisSchema(true), kinesisConsumerConfig); >>> >>> DataStream<Map<String, Object>> kinesisStream; >>> RichSinkFunction<InfluxDBPoint> influxSink; >>> >>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource = >>> env.addSource(kinesisConsumer); >>> kinesisStream = monitoringDataStreamSource >>> .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound >>> )); >>> influxSink = pms.createInfluxMonitoringSink(....); >>> ...... >>> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with >>> timeIntervalL=15s, 1m >>> >>> KeyedStream<Map<String, Object>, MonitoringTuple> >>> monitoringTupleKeyedStream = >>> kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric)); >>> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow> >>> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow); >>> DataStream<InfluxDBPoint> enrichedMGStream = >>> *windowStream.aggregate*(//*<===== >>> never reaches here ?????* >>> *new MGroupingWindowAggregate(interval)*, >>> new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule)) >>> .map(new MonitoringGroupingToInfluxDBPoint(rule)); >>> enrichedMGStream.addSink(influxSink); >>> env.execute("Aggregation of Map data"); >>> >>> *MonitoringTSWAssigner* code: >>> public class MonitoringTSWAssigner implements >>> AssignerWithPunctuatedWatermarks<Map<String, Object>> { >>> private long bound = 5 * (long) 1000;//5 secs out of order bound in >>> millisecs >>> private long maxTimestamp = Long.MIN_VALUE; >>> >>> public MonitoringTSWAssigner() { >>> } >>> >>> public MonitoringTSWAssigner(long bound) { >>> this.bound = bound; >>> } >>> >>> public long extractTimestamp(Map<String, Object> monitoring, long >>> previousTS) { >>> long extractedTS = getExtractedTS(monitoring); >>> if (extractedTS > maxTimestamp) { >>> maxTimestamp = extractedTS; >>> } >>> >>> return extractedTS;//return System.currentTimeMillis(); >>> >>> } >>> >>> public long getExtractedTS(Map<String, Object> monitoring) { >>> final String eventTimestamp = >>> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String) >>> monitoring.get(Utils.EVENT_TIMESTAMP) : ""; >>> return Utils.getLongFromDateStr(eventTimestamp); >>> } >>> >>> @Override >>> public Watermark checkAndGetNextWatermark(Map<String, Object> >>> monitoring, long extractedTimestamp) { >>> long extractedTS = getExtractedTS(monitoring); >>> long nextWatermark = maxTimestamp - bound; >>> return new Watermark(nextWatermark); >>> } >>> } >>> >>> *MGroupingWindowAggregate*: >>> public class MGroupingWindowAggregate implements >>> *AggregateFunction*<Map<String, >>> Object>, Map<String, Object>, Map<String, Object>> { >>> private final String interval; >>> public MGroupingWindowAggregate(String interval) { >>> this.interval = interval; >>> } >>> public Map<String, Object> createAccumulator() { >>> return new ConcurrentHashMap<>(); >>> } >>> >>> public Map<String, Object> add(Map<String, Object> monitoring, >>> Map<String, Object> timedMap) { >>> ..... >>> } >>> >>> ..... >>> >>> } >>> >>> TIA, >>> >>> >>> >>>