Re: Flink application question

2021-08-09 Thread David Anderson
FYI, I've responded to this on stack overflow:
https://stackoverflow.com/questions/68715430/apache-flink-accessing-keyed-state-from-late-window


On Mon, Aug 9, 2021 at 3:16 AM suman shil  wrote:

> I am writing a Flink application which consumes time series data from
> kafka topic. Time series data has components like metric name, tag key
> value pair, timestamp and a value. I have created a tumbling window to
> aggregate data based on a metric key (which is a combination of metric
> name, key value pair and timestamp). Here is the main stream looks like
>
> kafka source -> Flat Map which parses and emits Metric ->  Key by metric
> key  -> Tumbling window of 60 seconds -> Aggregate the data -> write to the
> sync.
>
> I also want to check if there is any metric which arrived late outside the
> above window. I want to check how many metrics arrived late and calculate
> the percentage of late metrics compared to original metrics. I am thinking
> of using the "allowedLateness" feature of flink to send the late metrics to
> a different stream. I am planning to add a "MapState" in the main
> "Aggregate the data" operator which will have the key as the metric key and
> value as the count of the metrics that arrived in the main window.
>
>
> kafka source -> Flat Map which parses and emits Metric -> Key by metric
> key ->  Tumbling window of 60 seconds -> Aggregate the data (Maintain a map
> state of metric count) -> write to the sync.
>
>  \
>
>   \
>
> Late data -> Key by
> metric key ->  Collect late metrics and find the percentage of late metrics
> -> Write the result in sink
>
> My question is can "Collect late metrics and find the percentage of late
> metrics" operator access the "MapState" which got updated by the
> mainstream. Even though they are keyed by the same metric key, I guess they
> are two different tasks. I want to calculate (number of late metrics /
> (number of late metrics + number of metrics arrived on time)).
>
> Thanks
> Suman
>


Flink application question

2021-08-09 Thread suman shil
I am writing a Flink application which consumes time series data from kafka
topic. Time series data has components like metric name, tag key value
pair, timestamp and a value. I have created a tumbling window to aggregate
data based on a metric key (which is a combination of metric name, key
value pair and timestamp). Here is the main stream looks like

kafka source -> Flat Map which parses and emits Metric ->  Key by metric
key  -> Tumbling window of 60 seconds -> Aggregate the data -> write to the
sync.

I also want to check if there is any metric which arrived late outside the
above window. I want to check how many metrics arrived late and calculate
the percentage of late metrics compared to original metrics. I am thinking
of using the "allowedLateness" feature of flink to send the late metrics to
a different stream. I am planning to add a "MapState" in the main
"Aggregate the data" operator which will have the key as the metric key and
value as the count of the metrics that arrived in the main window.


kafka source -> Flat Map which parses and emits Metric -> Key by metric key
->  Tumbling window of 60 seconds -> Aggregate the data (Maintain a map
state of metric count) -> write to the sync.

   \

\

  Late data -> Key by
metric key ->  Collect late metrics and find the percentage of late metrics
-> Write the result in sink

My question is can "Collect late metrics and find the percentage of late
metrics" operator access the "MapState" which got updated by the
mainstream. Even though they are keyed by the same metric key, I guess they
are two different tasks. I want to calculate (number of late metrics /
(number of late metrics + number of metrics arrived on time)).

Thanks
Suman