Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-30 Thread lorenzo . affetti
Thank you for your efforts!
I think that Qingsheng reported my concerns about discrepancies between event 
time from the external system and current system time in the Flink cluster in a 
more concrete fashion. Thank you for the catch!

The FLIP definitely makes sense and would love to see your fixes soon!

Thanks
On Apr 29, 2024 at 08:54 +0200, jialiang tan , wrote:
> Thanks Qingsheng for the patient guidance! And sorry to everyone for
> wasting your time.
>
> I suddenly realised that my implementation is wrong.
>
> In my implementation the FetchTime and EventTime do not come from the same
> record and I am making this stupid mistake :( .
>
> I need some time to think about it and will start a new discussion when I
> am ready.
>
> Best,
> JiaLiang.


Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-29 Thread jialiang tan
Thanks Qingsheng for the patient guidance! And sorry to everyone for
wasting your time.

I suddenly realised that my implementation is wrong.

In my implementation the FetchTime and EventTime do not come from the same
record and I am making this stupid mistake :( .

I need some time to think about it and will start a new discussion when I
am ready.

Best,
JiaLiang.


Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-28 Thread Qingsheng Ren
Hi Jialiang,

Thanks for the FLIP! Here're some thoughts of mine.

- For currentFetchEventTimeLag:

The problem of currentFetchEventTimeLag is: FetchTime is determined in
SplitReader driven by SplitFetcher thread, while EventTime is calculated at
the output of SourceOperator driven by task's main thread [1], and there's
a barrier (the elementQueue) between, so it's hard to calculate FetchTime -
EventTime accurately against two threads. I assume the new method
"recordFetched()" in SourceReaderMetricGroup can only be invoked in
SplitReader when records are being fetched from an external system, and
this will introduce concurrency issues as the event time is determined in a
different thread.

One possible solution in my mind is that records carry their own FetchTime
all the way until they reach SourceOutput and their event time is extracted
by TimestampAssigner, then we can calculate the accurate FetchTime -
EventTime. This requires some changes in the SourceReaderBase API.

- For currentProcessingTime:

The name is a bit confusing to me, as it is quite similar to the
"processing time" concept in stream processing [2]. Also I have some
concerns about this new metric: I think it can be derived directly by two
existing metrics (currentEmitEventTimeLag - currentFetchEventTimeLag), so
is it necessary to introduce a new one? It should be very easy to perform
this subtraction in an external metric monitoring system.

Best,
Qingsheng

[1]
https://github.com/apache/flink/blob/b1544e4e513d2b75b350c20dbb1c17a8232c22fd/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java#L107
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/#notions-of-time-event-time-and-processing-time

On Fri, Apr 26, 2024 at 8:05 PM jialiang tan 
wrote:

> Thanks and glad I saw your reply.
>
> > I could not find the precise comment in the thread you are referring to
> unfortunately. Do you have some specific point in the discussion in mind?
>
> Sorry for the misdirection, are you worried about the time zone and the NTP
> problem?
> Here are some of my personal insights, someone please correct me if I'm
> wrong.
>
> Regarding the NTP, I have consulted with an ops colleague in my company,
> this is guaranteed by the sysadmin, usually developers do not need to
> consider this problem.
> Regarding the time zone issue, I think consuming data across time zones
> shouldn't be happen (e.g. data produced in Singapore and consumed by Flink
> in China), perhaps as you said, providing this should come with disclaimers
> to the user.
>
> If we have to consider the above issues, I think it is hard for us to
> implement metrics like `currentEmitEventTimeLag`,
> `currentFetchEventTimeLag`.
> Assuming we ignore the NTP and timezone issues, then
> `currentFetchEventTimeLag` and `currentEmitEventTimeLag` are actually a
> pretty good reflection of the current consumption latency from the external
> db/mq.
>
> > However, why reflection?
>
> Thanks for the doubt, good question! Yes, at first it was just a
> workaround. But I found that "There is no connector (yet) available for
> Flink version 1.19" in flink-connector-kafka document[1]. And I kept
> thinking about it. It might be better to add support for this feature after
> upgrading the flink version of flink-connector-kafka. WDYT?
>
> > Is this some pseudo-code or an actual implementation? This should invoke
> something like `System.nanoTime()` and not `System.currentTimeMillis()`
> because of precision/accuracy reasons [1]
>
> This is the actual implementation and it has been running well in my
> company for months. And currently `currentEmitEventTimeLag` uses
> `System.currentTimeMillis()`, and `currentFetchEventTimeLag` should match
> it in my opinion.
>
> Best,
> TanJiaLiang.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#dependencies
>
>
>  于2024年4月26日周五 15:50写道:
>
> > Sorry for some mal-informed questions, and thanks for adding context here
> > and to the FLIP.
> >
> > About 1:
> > > I think this has been discussed in the FLIP-33 lists thread[2].
> >
> > I could not find the precise comment in the thread you are referring to
> > unfortunately. Do you have some specific point in the discussion in mind?
> > However I understand that those metrics already exist and you are making
> > them available now :)
> >
> > About 2:
> > Good :)
> >
> > About 3:
> > Totally agree with you, I like this approach, looks very consistent.
> >
> > About 4:
> > Yeah, it is already there :)
> >
> > About 5:
> > Thank you for providing the example, now it is clearer.
> > However, why reflection? Is this only a workaround to make the current
> > Kafka connector invoke `recordFetched` with newer version of Flink
> without
> > bumping the Flink version? Because, when you bump the Flink version the
> > method will be exposed at compile time by `SourceReaderMetricGroup`. Can
> > you comment on this?

Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-26 Thread jialiang tan
Thanks and glad I saw your reply.

> I could not find the precise comment in the thread you are referring to
unfortunately. Do you have some specific point in the discussion in mind?

Sorry for the misdirection, are you worried about the time zone and the NTP
problem?
Here are some of my personal insights, someone please correct me if I'm
wrong.

Regarding the NTP, I have consulted with an ops colleague in my company,
this is guaranteed by the sysadmin, usually developers do not need to
consider this problem.
Regarding the time zone issue, I think consuming data across time zones
shouldn't be happen (e.g. data produced in Singapore and consumed by Flink
in China), perhaps as you said, providing this should come with disclaimers
to the user.

If we have to consider the above issues, I think it is hard for us to
implement metrics like `currentEmitEventTimeLag`,
`currentFetchEventTimeLag`.
Assuming we ignore the NTP and timezone issues, then
`currentFetchEventTimeLag` and `currentEmitEventTimeLag` are actually a
pretty good reflection of the current consumption latency from the external
db/mq.

> However, why reflection?

Thanks for the doubt, good question! Yes, at first it was just a
workaround. But I found that "There is no connector (yet) available for
Flink version 1.19" in flink-connector-kafka document[1]. And I kept
thinking about it. It might be better to add support for this feature after
upgrading the flink version of flink-connector-kafka. WDYT?

> Is this some pseudo-code or an actual implementation? This should invoke
something like `System.nanoTime()` and not `System.currentTimeMillis()`
because of precision/accuracy reasons [1]

This is the actual implementation and it has been running well in my
company for months. And currently `currentEmitEventTimeLag` uses
`System.currentTimeMillis()`, and `currentFetchEventTimeLag` should match
it in my opinion.

Best,
TanJiaLiang.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/kafka/#dependencies


 于2024年4月26日周五 15:50写道:

> Sorry for some mal-informed questions, and thanks for adding context here
> and to the FLIP.
>
> About 1:
> > I think this has been discussed in the FLIP-33 lists thread[2].
>
> I could not find the precise comment in the thread you are referring to
> unfortunately. Do you have some specific point in the discussion in mind?
> However I understand that those metrics already exist and you are making
> them available now :)
>
> About 2:
> Good :)
>
> About 3:
> Totally agree with you, I like this approach, looks very consistent.
>
> About 4:
> Yeah, it is already there :)
>
> About 5:
> Thank you for providing the example, now it is clearer.
> However, why reflection? Is this only a workaround to make the current
> Kafka connector invoke `recordFetched` with newer version of Flink without
> bumping the Flink version? Because, when you bump the Flink version the
> method will be exposed at compile time by `SourceReaderMetricGroup`. Can
> you comment on this?
>
> About 6:
> Got it, thank you.
>
> As last comment, in the implementation I can see:
>lastFetchTime = clock.absoluteTimeMillis();
>
> Is this some pseudo-code or an actual implementation?
> This should invoke something like `System.nanoTime()` and not
> `System.currentTimeMillis()` because of precision/accuracy reasons [1]
>
> [1]
> https://stackoverflow.com/questions/351565/system-currenttimemillis-vs-system-nanotime
> On Apr 24, 2024 at 14:00 +0200, jialiang tan ,
> wrote:
> > Hi lorenzo, thanks for your feedback!
> >
> >
> > > There can be major discrepancies between the absolute time got by the
> > > TaskManagers (if clocks are not synchronized via ntp for example), and
> the
> > > results of the metric might be quite distant for different TMs.
> >
> >
> > I don't know much about this. can more experienced contributors help? If
> it
> > exists, perhaps `currentEmitEventTimeLag` has the same problem?
> >
> > Furthermore, comparing the time on the Flink cluster with the event time
> of
> > > records might introduce completely inaccurate results.
> >
> >
> > I think this has been discussed in the FLIP-33 lists thread[2].
> >
> > 2 - I don't think the name `processingLag` represents the processing time
> > > spent, I would rather see `processingTime` just for the semantics of
> the
> > > name itself.
> >
> >
> > +1, it makes sense to me.
> >
> > 3 - Do you really think the `processingTime` should be a gauge? I
> > > understand your justification for the fetch lag, but I think the
> processing
> > > time should be an histogram. For the inefficiency of this, how about
> some
> > > sampling (e.g.: only update the histogram 1 every 1000 events?)
> >
> >
> > I went back to the discussion of the FLIP-33 lists thread[2]. At first,
> > `emitEventTimeLag` and `fetchEventTimeLag` were defined as histogram, and
> > they felt that histogram was too expensive. So they imported
> > `currentFetchEventTimeLag` and `currentEmitEventTimeLag` instead, 

Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-26 Thread lorenzo . affetti
Sorry for some mal-informed questions, and thanks for adding context here and 
to the FLIP.

About 1:
> I think this has been discussed in the FLIP-33 lists thread[2].

I could not find the precise comment in the thread you are referring to 
unfortunately. Do you have some specific point in the discussion in mind?
However I understand that those metrics already exist and you are making them 
available now :)

About 2:
Good :)

About 3:
Totally agree with you, I like this approach, looks very consistent.

About 4:
Yeah, it is already there :)

About 5:
Thank you for providing the example, now it is clearer.
However, why reflection? Is this only a workaround to make the current Kafka 
connector invoke `recordFetched` with newer version of Flink without bumping 
the Flink version? Because, when you bump the Flink version the method will be 
exposed at compile time by `SourceReaderMetricGroup`. Can you comment on this?

About 6:
Got it, thank you.

As last comment, in the implementation I can see:
       lastFetchTime = clock.absoluteTimeMillis();

Is this some pseudo-code or an actual implementation?
This should invoke something like `System.nanoTime()` and not 
`System.currentTimeMillis()` because of precision/accuracy reasons [1]

[1] 
https://stackoverflow.com/questions/351565/system-currenttimemillis-vs-system-nanotime
On Apr 24, 2024 at 14:00 +0200, jialiang tan , wrote:
> Hi lorenzo, thanks for your feedback!
>
>
> > There can be major discrepancies between the absolute time got by the
> > TaskManagers (if clocks are not synchronized via ntp for example), and the
> > results of the metric might be quite distant for different TMs.
>
>
> I don't know much about this. can more experienced contributors help? If it
> exists, perhaps `currentEmitEventTimeLag` has the same problem?
>
> Furthermore, comparing the time on the Flink cluster with the event time of
> > records might introduce completely inaccurate results.
>
>
> I think this has been discussed in the FLIP-33 lists thread[2].
>
> 2 - I don't think the name `processingLag` represents the processing time
> > spent, I would rather see `processingTime` just for the semantics of the
> > name itself.
>
>
> +1, it makes sense to me.
>
> 3 - Do you really think the `processingTime` should be a gauge? I
> > understand your justification for the fetch lag, but I think the processing
> > time should be an histogram. For the inefficiency of this, how about some
> > sampling (e.g.: only update the histogram 1 every 1000 events?)
>
>
> I went back to the discussion of the FLIP-33 lists thread[2]. At first,
> `emitEventTimeLag` and `fetchEventTimeLag` were defined as histogram, and
> they felt that histogram was too expensive. So they imported
> `currentFetchEventTimeLag` and `currentEmitEventTimeLag` instead, and put
> `fetchEventTimeLag` and `emitEventTimeLag` in the future work. I think we
> need to create another FLIP to discuss it. Now I want to change
> `processingTime` to `currentProcessingTime`, still using gauge, and remain
> `processingTime` in the future work, WDYT?
>
> 4 - At this point, if we have the processing time and number of records, we
> > could also add throughput as a metric, so that the user would know how many
> > records/second the source is able to produce.
>
>
> Do you mean `numRecordsInPerSecond` and `numRecordsOutPerSecond`? They were
> defined in FLIP-33[1] and used in Flink-1.14.
>
> 5 - For the "Kafka Connector" section: can this be generalized for
> > connectors in general? Can you provide an example to better understand your
> > statement about reflection?
>
>
> Good idea, I have extended my FLIP.
>
> 6 - Does this introduce any UI change for representing the metric?
>
>
> I'm just adding some new metrics, users can get them from REST API or
> collect them into Prometheus, and no change for Flink WEB UI.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
> [2] https://lists.apache.org/thread/r47zrqto4k8tsc3xvfg392zblfx6dczl
>
> Best,
> TanJiaLiang
>
>  于2024年4月24日周三 15:02写道:
>
> > Hello jialiang tan, and thank you for your contribution!
> >
> > Here are my questions:
> >
> > 1 - I don't know if exposing this as metric to the user could create some
> > harm. There can be major discrepancies between the absolute time got by the
> > TaskManagers (if clocks are not synchronized via ntp for example), and the
> > results of the metric might be quite distant for different TMs.
> > Furthermore, comparing the time on the Flink cluster with the event time of
> > records might introduce completely inaccurate results. I think providing
> > this should come with many disclaimers to the user. Maybe, more experienced
> > contributors can comment on this as well.
> >
> > 2 - I don't think the name `processingLag` represents the processing time
> > spent, I would rather see `processingTime` just for the semantics of the
> > name itself.
> >
> > 3 - Do you really think the `processingTime` should 

Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-24 Thread jialiang tan
Hi lorenzo, thanks for your feedback!


> There can be major discrepancies between the absolute time got by the
> TaskManagers (if clocks are not synchronized via ntp for example), and the
> results of the metric might be quite distant for different TMs.


I don't know much about this. can more experienced contributors help? If it
exists, perhaps `currentEmitEventTimeLag` has the same problem?

Furthermore, comparing the time on the Flink cluster with the event time of
> records might introduce completely inaccurate results.


I think this has been discussed in the FLIP-33 lists thread[2].

2 - I don't think the name `processingLag` represents the processing time
> spent, I would rather see `processingTime` just for the semantics of the
> name itself.


+1, it makes sense to me.

3 - Do you really think the `processingTime` should be a gauge? I
> understand your justification for the fetch lag, but I think the processing
> time should be an histogram. For the inefficiency of this, how about some
> sampling (e.g.: only update the histogram 1 every 1000 events?)


I went back to the discussion of the FLIP-33 lists thread[2]. At first,
`emitEventTimeLag` and `fetchEventTimeLag` were defined as histogram, and
they felt that histogram was too expensive. So they imported
`currentFetchEventTimeLag` and `currentEmitEventTimeLag` instead, and put
`fetchEventTimeLag` and `emitEventTimeLag` in the future work. I think we
need to create another FLIP to discuss it. Now I want to change
`processingTime` to `currentProcessingTime`, still using gauge, and remain
`processingTime` in the future work, WDYT?

4 - At this point, if we have the processing time and number of records, we
> could also add throughput as a metric, so that the user would know how many
> records/second the source is able to produce.


Do you mean `numRecordsInPerSecond` and `numRecordsOutPerSecond`? They were
defined in FLIP-33[1] and used in Flink-1.14.

5 - For the "Kafka Connector" section: can this be generalized for
> connectors in general? Can you provide an example to better understand your
> statement about reflection?


Good idea, I have extended my FLIP.

6 - Does this introduce any UI change for representing the metric?


I'm just adding some new metrics, users can get them from REST API or
collect them into Prometheus, and no change for Flink WEB UI.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics
[2] https://lists.apache.org/thread/r47zrqto4k8tsc3xvfg392zblfx6dczl

Best,
TanJiaLiang

 于2024年4月24日周三 15:02写道:

> Hello jialiang tan, and thank you for your contribution!
>
> Here are my questions:
>
> 1 - I don't know if exposing this as metric to the user could create some
> harm. There can be major discrepancies between the absolute time got by the
> TaskManagers (if clocks are not synchronized via ntp for example), and the
> results of the metric might be quite distant for different TMs.
> Furthermore, comparing the time on the Flink cluster with the event time of
> records might introduce completely inaccurate results. I think providing
> this should come with many disclaimers to the user. Maybe, more experienced
> contributors can comment on this as well.
>
> 2 - I don't think the name `processingLag` represents the processing time
> spent, I would rather see `processingTime` just for the semantics of the
> name itself.
>
> 3 - Do you really think the `processingTime` should be a gauge? I
> understand your justification for the fetch lag, but I think the processing
> time should be an histogram. For the inefficiency of this, how about some
> sampling (e.g.: only update the histogram 1 every 1000 events?)
>
> 4 - At this point, if we have the processing time and number of records,
> we could also add throughput as a metric, so that the user would know how
> many records/second the source is able to produce.
>
> 5 - For the "Kafka Connector" section: can this be generalized for
> connectors in general? Can you provide an example to better understand your
> statement about reflection?
>
> 6 - Does this introduce any UI change for representing the metric?
>
> Thank you!
> On Apr 22, 2024 at 12:26 +0200, jialiang tan ,
> wrote:
> > Sorry all, it seems bad formatting in my email message, now I send it
> again
> > gently and hope it work.
> >
> > I would like to start a discussion about FLIP-XXX:
> > SupportcurrentFetchEventTimeLag and processingLag metrics [1].
> >
> > The main motivation for this change was that I had some difficulties
> > inimplementing the currentFetchEventTimeLag metrics for KafkaSource [2].
> >
> > So I proposed to let the SourceReaderMetricGroup provide an interface to
> > capturethe FetchTime so that all the FLIP-27 [3] sources can easily
> > implement thecurrentFetchEventTimeLag metrics.
> >
> > In addition, I propose to support the processingLag metric for the
> > FLIP-27sources to measure the current processing latency of the source.
> >
> > See the FLIP [1] and Jira 

Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-24 Thread lorenzo . affetti
Hello jialiang tan, and thank you for your contribution!

Here are my questions:

1 - I don't know if exposing this as metric to the user could create some harm. 
There can be major discrepancies between the absolute time got by the 
TaskManagers (if clocks are not synchronized via ntp for example), and the 
results of the metric might be quite distant for different TMs. Furthermore, 
comparing the time on the Flink cluster with the event time of records might 
introduce completely inaccurate results. I think providing this should come 
with many disclaimers to the user. Maybe, more experienced contributors can 
comment on this as well.

2 - I don't think the name `processingLag` represents the processing time 
spent, I would rather see `processingTime` just for the semantics of the name 
itself.

3 - Do you really think the `processingTime` should be a gauge? I understand 
your justification for the fetch lag, but I think the processing time should be 
an histogram. For the inefficiency of this, how about some sampling (e.g.: only 
update the histogram 1 every 1000 events?)

4 - At this point, if we have the processing time and number of records, we 
could also add throughput as a metric, so that the user would know how many 
records/second the source is able to produce.

5 - For the "Kafka Connector" section: can this be generalized for connectors 
in general? Can you provide an example to better understand your statement 
about reflection?

6 - Does this introduce any UI change for representing the metric?

Thank you!
On Apr 22, 2024 at 12:26 +0200, jialiang tan , wrote:
> Sorry all, it seems bad formatting in my email message, now I send it again
> gently and hope it work.
>
> I would like to start a discussion about FLIP-XXX:
> SupportcurrentFetchEventTimeLag and processingLag metrics [1].
>
> The main motivation for this change was that I had some difficulties
> inimplementing the currentFetchEventTimeLag metrics for KafkaSource [2].
>
> So I proposed to let the SourceReaderMetricGroup provide an interface to
> capturethe FetchTime so that all the FLIP-27 [3] sources can easily
> implement thecurrentFetchEventTimeLag metrics.
>
> In addition, I propose to support the processingLag metric for the
> FLIP-27sources to measure the current processing latency of the source.
>
> See the FLIP [1] and Jira [2] for more details.
>
> Looking forward to your comments and opinions!
>
> Thanks,
> TanJiaLiang.
>
> [1]
> https://docs.google.com/document/d/1nPhh1A-v-a7zyQyl1A5-K5DeUqbfxNXdjr2TVBT-QMs/edit?usp=sharing
> [2] https://issues.apache.org/jira/browse/FLINK-33173
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> >


Re: [DISCUSS] FLIP-XXX Support currentFetchEventTimeLag and processingLag metrics

2024-04-22 Thread jialiang tan
Sorry all, it seems bad formatting in my email message, now I send it again
gently and hope it work.

I would like to start a discussion about FLIP-XXX:
SupportcurrentFetchEventTimeLag and processingLag metrics [1].

The main motivation for this change was that I had some difficulties
inimplementing the currentFetchEventTimeLag metrics for KafkaSource [2].

So I proposed to let the SourceReaderMetricGroup provide an interface to
capturethe FetchTime so that all the FLIP-27 [3] sources can easily
implement thecurrentFetchEventTimeLag metrics.

In addition, I propose to support the processingLag metric for the
FLIP-27sources to measure the current processing latency of the source.

See the FLIP [1] and Jira [2] for more details.

Looking forward to your comments and opinions!

Thanks,
TanJiaLiang.

[1]
https://docs.google.com/document/d/1nPhh1A-v-a7zyQyl1A5-K5DeUqbfxNXdjr2TVBT-QMs/edit?usp=sharing
[2] https://issues.apache.org/jira/browse/FLINK-33173
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

>