Hi Theo,
You were right. For some reason(I still haven't figured it out) but the
FilterFunction was causing issues. I commented it out and it started
getting into the add() method of the aggregate method.

/*kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
Object>>) inputMap -> {
    Object groupByValueObj = inputMap.get(groupBy);
    return groupByValueObj != null;
});*/
//String metric = Objects.requireNonNull(inputMetricSelector).getMetric();

TIA,

Vijay




On Tue, Oct 15, 2019 at 9:34 AM Vijay Balakrishnan <bvija...@gmail.com>
wrote:

> Hi Theo,
> It gets to the FilterFunction during the creation of the ExecutionGraph
> initially but not during the runtime when recs are streaming in.So, it is
> not getting that far- seems to be stuck in the
>
> final SingleOutputStreamOperator<Map<String, Object>> filteredKinesisStream = 
> kinesisStream.filter   code.
>
> Doesn't seem to get past it as it keeps incrementing watermarks but the 
> Watermark never seems to hit the end of the window.Maybe I am doing
>
> something super simple stupid.
>
> TIA,
> Vijay
>
> On Tue, Oct 15, 2019 at 12:48 AM Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
>
>> Hi Vijay,
>>
>> Maybe a stupid question, but according to your comments, the code works
>> fine up till a "flatMap" operation. It seems that this flatMap is directly
>> followed by a filter-Function in the method
>> createAggregatedMonitoringGroupingWindowStream1. Is ist maybe filtering out
>> all events? Or is not even the filter function itself called? (Due to your
>> comments suggesting it).
>>
>> Best regards
>> Theo
>>
>> ------------------------------
>> *Von: *"Vijay Balakrishnan" <bvija...@gmail.com>
>> *An: *"Dawid Wysakowicz" <dwysakow...@apache.org>
>> *CC: *"user" <user@flink.apache.org>
>> *Gesendet: *Dienstag, 15. Oktober 2019 02:01:05
>> *Betreff: *Re: add() method of AggregateFunction not called even though
>> new watermark is emitted
>>
>> Hi,
>> Thx for the replies - Congxian & Dawdi.
>> Watermarks are advancing.Not sure how to check every new generated
>> watermark is reaching end of the window ????
>>
>> I did check the Flink UI for the currentInputWatermark and it is
>> increasing monotonically.
>>
>> Narrowed down the problem to not calling the windowStream.aggregate.
>> I also *added a checkpoint *to see if it was causing the issue.Didn't
>> seem to help.
>> Most of the code is reached during the creation of the ExecutionGraph on
>> the start of the program.
>>
>> I generate an incrementing sequence of timestamps(delay of 5000ms between
>> each rec) from a Producer to Kinesis and it emits a new watermark as it
>> starts receiving the input records.
>> My window size is 15s.
>> I see a WindowedStream is created with windowAssigner:
>> TumblingEventTimeWindows(15000) and trigger: EventTimeTrigger
>> but the *code never gets into the EventTimeTrigger.onElement() or
>> onEventTime() to fire the trigger*.
>> It gets into TimestampsAndPunctuatedWatermarkOperator and emitWatermark().
>> I even tried to use ProcessingTime but that also didn't help.
>>
>>
>> //code to create kinesis consumer successfully......
>> for (Rule rule : rules.getRules()) {
>> //gets in here fine
>>     final SingleOutputStreamOperator<Map<String, Object>>
>> filteredKinesisStream = kinesisStream.filter(mon -> {
>>                 boolean result;
>>                 String eventName = mon.get(MEASUREMENT) != null ?
>> (String) mon.get(MEASUREMENT) : "";
>>                 InputMetricSelector inputMetricSelector =
>> rule.getInputMetricSelector();
>>                 String measurement = inputMetricSelector != null ?
>> inputMetricSelector.getMeasurement() : "";
>>                 result = eventName.equals(measurement);
>>                 if (result) {
>>                     Map<String, String> inputTags = mon.get(TAGS) != null
>> ? (Map<String, String>) mon.get(TAGS) : new HashMap<>();
>>                     Map<String, String> ruleTags = inputMetricSelector !=
>> null ? inputMetricSelector.getTags() : new HashMap<>();
>>                     result = matchTags(inputTags, ruleTags);
>>                 }
>>                 return result;//*<== this is true*
>>             }
>>     ).flatMap((FlatMapFunction<Map<String, Object>, Map<String, Object>>)
>> (input, out) -> {
>>             out.collect(input);//*<==== runs up till here fine*
>>     }).returns(new TypeHint<Map<String, Object>>() {
>>     });
>> //*doesn't do anything beyond this point at runtime*
>>     DataStream<InfluxDBPoint> enrichedMGStream =
>> pms.createAggregatedMonitoringGroupingWindowStream1
>>             (filteredKinesisStream, ruleFactory, rule, parallelProcess);
>>     enrichedMGStream.addSink(influxSink)
>>             .setParallelism(nbrSinks);
>> }
>>
>> private DataStream<InfluxDBPoint>
>> createAggregatedMonitoringGroupingWindowStream1(DataStream<Map<String,
>> Object>> kinesisStream, RuleFactory ruleFactory, Rule rule, int
>> parallelProcess) {
>>     DataStream<InfluxDBPoint> enrichedComponentInstanceStream1;
>>     RuleConfig ruleConfig = rule.getRuleConfig();
>>     String ruleType = ruleConfig != null ? ruleConfig.getRuleType() : "";
>>     RuleIF ruleImpl = ruleFactory.getRule(ruleType);
>>     Map<String, Object> ruleProps = ruleConfig != null ?
>> ruleConfig.getRuleProps() : new HashMap<>();
>>     Object intervalObj = ruleProps.get("rule_eval_window");
>>     String timeInterval = intervalObj != null ? (String) intervalObj : "";
>>     org.apache.flink.streaming.api.windowing.time.Time timeWindow =
>> getTimeWindowFromInterval(timeInterval);
>>
>>     Object windowTypeObj = ruleProps.get("window_type");
>>     String windowType = windowTypeObj != null ? (String) windowTypeObj :
>> "";
>>
>>     InputMetricSelector inputMetricSelector =
>> rule.getInputMetricSelector();
>>     Map<String, String> tags = inputMetricSelector != null ?
>> inputMetricSelector.getTags() : new HashMap<>();
>>     String groupByObj = tags.get(GROUP_BY);
>>     String groupBy = groupByObj != null ? groupByObj : "";
>>     kinesisStream = kinesisStream.filter((FilterFunction<Map<String,
>> Object>>) inputMap -> {
>>         Object groupByValueObj = inputMap.get(groupBy);
>>         return groupByValueObj != null;
>>     });
>>     Set<String> groupBySet = new
>> HashSet<>(Arrays.asList(groupBy.split(KEY_DELIMITER)));
>>     String metric =
>> Objects.requireNonNull(inputMetricSelector).getMetric();
>> //till here, it went through fine during creation of ExceutionGraph
>>     KeyedStream<Map<String, Object>, MonitoringTuple>
>> monitoringTupleKeyedStream =
>>             kinesisStream.keyBy(new MapTupleKeySelector(groupBySet,
>> metric));*<=== never gets into the MapTupleKeySelector.getKey() - a
>> similar class works in another project*
>>     enrichedComponentInstanceStream1 =
>> getMonitoringGroupDataStream1(monitoringTupleKeyedStream, timeWindow,
>> windowType, timeInterval, ruleImpl, rule, parallelProcess);
>>     return enrichedComponentInstanceStream1;
>> }
>>
>> private DataStream<InfluxDBPoint>
>> getMonitoringGroupDataStream1(KeyedStream<Map<String, Object>,
>> MonitoringTuple> monitoringTupleKeyedStream,
>>
>> org.apache.flink.streaming.api.windowing.time.Time timeWindow, String
>> windowType,
>>                                                                 String
>> interval,
>>                                                                 RuleIF
>> ruleImpl, Rule rule, int parallelProcess) {
>>     long slide = 100;
>>     final WindowedStream<Map<String, Object>, MonitoringTuple,
>> TimeWindow> windowStream =
>>             windowType.equalsIgnoreCase(SLIDING) ?
>>                     monitoringTupleKeyedStream
>>                             .timeWindow(timeWindow,
>> org.apache.flink.streaming.api.windowing.time.Time.milliseconds(slide)) :
>>                     monitoringTupleKeyedStream
>>                             .timeWindow(timeWindow);
>>     return windowStream.aggregate(
>>             new MGroupingWindowAggregate(interval),//*<=== never gets
>> into add() here*
>>             new MGroupingAggregateWindowProcessing(interval, ruleImpl,
>> rule))
>>             .map(new MonitoringGroupingToInfluxDBPoint(rule));
>>
>> }
>>
>> On Mon, Oct 14, 2019 at 12:41 AM Dawid Wysakowicz <dwysakow...@apache.org>
>> wrote:
>>
>>> Hi Vijay,
>>>
>>> Could you check if the Watermark for the aggregate operator advances?
>>> You should be able to check that in the Flink WebUI. Could it be that the
>>> Watermark does not advance for all of the upstream operators? The watermark
>>> for a particular operator is a minimum of watermarks received from all of
>>> the upstream operators. Therefore if some of them does not produce any, the
>>> resulting watermark will not advance.
>>>
>>> Best,
>>>
>>> Dawdi
>>> On 11/10/2019 21:37, Vijay Balakrishnan wrote:
>>>
>>> Hi,
>>> Here is my issue with *Event Processing* with the *add() method of
>>> MGroupingWindowAggregate not being called* even though a new watermark
>>> is fired
>>> 1. *Ingest data from Kinesis (works fine)*
>>> 2. *Deserialize* in MonitoringMapKinesisSchema(*works fine* and get
>>> json back)
>>> 3. I do *assign MonitoringTSWAssigner*(code below) to the source with
>>> bound of 10(have tried 3000, 30000). *It fires a new WaterMark* with
>>> each
>>> incoming record but the *windowStream.aggregate method doesn't seem to
>>> fire* and I
>>> *don't see the add() method of MGroupingWindowAggregate called *???? I *can
>>> see the newWaterMark being emitted in
>>> TimestampsAndPunctuatedWatermarksOperator.processElement*
>>> 4. I have tried with timeWindow of 1m and 15s
>>>
>>> *Main* code:
>>>
>>> final StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.*EventTime*);
>>>
>>> //Setup Kinesis Consumer
>>> Properties kinesisConsumerConfig = new Properties();
>>> ..
>>> kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
>>> ConsumerConfigConstants.InitialPosition.LATEST.name());//LATEST
>>> FlinkKinesisConsumer<Map<String, Object>> kinesisConsumer = new
>>> FlinkKinesisConsumer<>(
>>>                 "kinesisTopicRead", new
>>> MonitoringMapKinesisSchema(true), kinesisConsumerConfig);
>>>
>>> DataStream<Map<String, Object>> kinesisStream;
>>> RichSinkFunction<InfluxDBPoint> influxSink;
>>>
>>> DataStreamSource<Map<String, Object>> monitoringDataStreamSource =
>>> env.addSource(kinesisConsumer);
>>> kinesisStream = monitoringDataStreamSource
>>>         .assignTimestampsAndWatermarks(new *MonitoringTSWAssigner*(bound
>>> ));
>>> influxSink = pms.createInfluxMonitoringSink(....);
>>> ......
>>> ...timeWindow = Time.seconds(*timeIntervalL*);//tried with
>>> timeIntervalL=15s, 1m
>>>
>>> KeyedStream<Map<String, Object>, MonitoringTuple>
>>> monitoringTupleKeyedStream =
>>>         kinesisStream.keyBy(new MapTupleKeySelector(groupBySet, metric));
>>> final WindowedStream<Map<String, Object>, MonitoringTuple, TimeWindow>
>>> windowStream = monitoringTupleKeyedStream.timeWindow(timeWindow);
>>> DataStream<InfluxDBPoint> enrichedMGStream = 
>>> *windowStream.aggregate*(//*<=====
>>> never reaches here ?????*
>>>         *new MGroupingWindowAggregate(interval)*,
>>>         new MGroupingAggregateWindowProcessing(interval, ruleImpl, rule))
>>>         .map(new MonitoringGroupingToInfluxDBPoint(rule));
>>> enrichedMGStream.addSink(influxSink);
>>> env.execute("Aggregation of Map data");
>>>
>>> *MonitoringTSWAssigner* code:
>>> public class MonitoringTSWAssigner implements
>>> AssignerWithPunctuatedWatermarks<Map<String, Object>> {
>>>     private long bound = 5 * (long) 1000;//5 secs out of order bound in
>>> millisecs
>>>     private long maxTimestamp = Long.MIN_VALUE;
>>>
>>>     public MonitoringTSWAssigner() {
>>>     }
>>>
>>>     public MonitoringTSWAssigner(long bound) {
>>>         this.bound = bound;
>>>     }
>>>
>>>     public long extractTimestamp(Map<String, Object> monitoring, long
>>> previousTS) {
>>>         long extractedTS = getExtractedTS(monitoring);
>>>         if (extractedTS > maxTimestamp) {
>>>             maxTimestamp = extractedTS;
>>>         }
>>>
>>>    return extractedTS;//return System.currentTimeMillis();
>>>
>>>     }
>>>
>>>     public long getExtractedTS(Map<String, Object> monitoring) {
>>>         final String eventTimestamp =
>>> monitoring.get(Utils.EVENT_TIMESTAMP) != null ? (String)
>>> monitoring.get(Utils.EVENT_TIMESTAMP) : "";
>>>         return Utils.getLongFromDateStr(eventTimestamp);
>>>     }
>>>
>>>     @Override
>>>     public Watermark checkAndGetNextWatermark(Map<String, Object>
>>> monitoring, long extractedTimestamp) {
>>>         long extractedTS = getExtractedTS(monitoring);
>>>         long nextWatermark = maxTimestamp - bound;
>>>         return new Watermark(nextWatermark);
>>>     }
>>> }
>>>
>>> *MGroupingWindowAggregate*:
>>> public class MGroupingWindowAggregate implements 
>>> *AggregateFunction*<Map<String,
>>> Object>, Map<String, Object>, Map<String, Object>> {
>>>     private final String interval;
>>>     public MGroupingWindowAggregate(String interval) {
>>>         this.interval = interval;
>>>     }
>>>     public Map<String, Object> createAccumulator() {
>>>         return new ConcurrentHashMap<>();
>>>     }
>>>
>>>     public Map<String, Object> add(Map<String, Object> monitoring,
>>> Map<String, Object> timedMap) {
>>> .....
>>> }
>>>
>>> .....
>>>
>>> }
>>>
>>> TIA,
>>>
>>>
>>>
>>>

Reply via email to