(无主题)

2023-06-12 Thread Paul
在flink处理函数中定义一个状态变量,比如private ValueState 
vs;这个状态变量是否需要用transient来修饰,为什么呢?以及什么情况下flink代码中需要用transient来修饰变量,什么情况下不用transient来修饰?请大家指教




Re: Async IO operator to write to DB

2023-06-12 Thread Karthik Deivasigamani
Thanks Martijn, the documentation for Async IO was also indicating the same
and that's what prompted me to post this question here.
~
Karthik

On Mon, Jun 12, 2023 at 7:45 PM Martijn Visser 
wrote:

> Hi Karthik,
>
> In my opinion, it makes more sense to use a sink to leverage Scylla over
> using Async IO. The primary use case for Async IO is enrichment, not for
> writing to a sync.
>
> Best regards,
>
> Martijn
>
> On Mon, Jun 12, 2023 at 4:10 PM Karthik Deivasigamani 
> wrote:
>
>> Thanks Martijn for your response.
>> One thing I did not mention was that we are in the process of moving away
>> from Cassandra to Scylla and would like to use the Scylla Java Driver for
>> the following reason :
>>
>>> The Scylla Java driver is shard aware and contains extensions for a
>>> tokenAwareHostPolicy. Using this policy, the driver can select a
>>> connection to a particular shard based on the shard’s token. As a result,
>>> latency is significantly reduced because there is no need to pass data
>>> between the shards.
>>>
>> We were considering writing our own Sink to leverage Scylla Java Driver
>> once the migration is done.
>> ~
>> Karthik
>>
>>
>> On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser 
>> wrote:
>>
>>> Hi,
>>>
>>> Why wouldn't you just use the Flink Kafka connector and the Flink
>>> Cassandra connector for your use case?
>>>
>>> Best regards,
>>>
>>> Martijn
>>>
>>> On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani <
>>> karthi...@gmail.com> wrote:
>>>
 Hi,
I have a use case where I need to read messages from a Kafka topic,
 parse it and write it to a database (Cassandra). Since Cassandra supports
 async APIs I was considering using Async IO operator for my writes. I do
 not need exactly-once semantics for my use-case.
 Is it okay to leverage the Async IO operator as a Sink (writing data
 into a DB)?
 ~
 Karthik

>>>


Flink bulk and record file source format metrices

2023-06-12 Thread Kamal Mittal via user
Hello,

Using Flink record stream format file source API as below for parquet records 
reading.

FileSource.FileSourceBuilder source = 
FileSource.forRecordStreamFormat(streamformat, path);
source.monitorContinuously(Duration.ofMillis(1));

Want to log/generate metrices for corrupt records and for the same need to log 
flink metrices at source level in parquet reader class, is there any way to do 
that as right now no handle for SourceContext available?

Rgds,
Kamal


Re: flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 Thread Feng Jin
hi casel

1. 可以考虑使用 Flink1.15, 使用精简的 operator name

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-simplify-operator-name-enabled

2.  Flink 也提供了 restful 接口直接获取瞬时的 metric,如果不需要历史的 metric

https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobmanager-metrics


Best,
Feng

On Tue, Jun 13, 2023 at 8:51 AM casel.chen  wrote:

> 线上跑了200多个flink
> sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
> flink
> sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
> 请问这个问题有什么好的办法解决吗?


flink sql作业指标名称过长把prometheus内存打爆问题

2023-06-12 Thread casel.chen
线上跑了200多个flink 
sql作业,接了prometheus指标(prometheus定期来获取作业指标)监控后没跑一会儿就将prometheus内存打爆(开了64GB内存),查了一下是因为指标名称过长导致的。
flink 
sql作业的指标名称一般是作业名称+算子名称组成的,而算子名称是由sql内容拼出来的,在select字段比较多或sql较复杂的情况下容易生成过长的名称,
请问这个问题有什么好的办法解决吗?

Re:Re: 求flink作业各个算子的延迟指标

2023-06-12 Thread casel.chen
谢谢解答,如果是flink sql作业要如何获取到作业中每个算子的latency指标呢?
而且通过prometheus获取作业指标的话,因为flink sql作业中每个算子的名称是按sql内容拼出来的,会出现名称很长,
这样的算子指标直接打到prometheus的话会直接将prometheus内存打爆,这个问题有什么好的办法解决吗?

















在 2023-06-12 18:01:11,"Hangxiang Yu"  写道:
>[.[.]]..latency
>这个应该可以满足需求?也可以设置不同的粒度。
>https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io
>
>On Mon, Jun 12, 2023 at 5:05 PM casel.chen  wrote:
>
>> 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?
>
>
>
>-- 
>Best,
>Hangxiang.


Re: Building Dynamic SQL using contents of Map datastructure

2023-06-12 Thread Shammon FY
Hi Yogesh,

I think you need to build the dynamic SQL statement in your service and
then submit the SQL to flink cluster.

Best,
Shammon FY

On Mon, Jun 12, 2023 at 9:15 PM Yogesh Rao  wrote:

> Hi,
>
> Is there a way we can build a dynamic SQL in Flink from contents of Map ?
>
> Essentially trying to do achieve something like below
>
> StringBuilder builder = new StringBuilder("INSERT INTO sampleSink SELECT "
> );
>
> builder.append("getColumnsFromMap(dataMap), ");
>
> builder.append(" FROM Data").toString();
>
>
> Here getColumnsFromMap is registered as ScalarFunction which would return
> array of Strings basically column names.
>
>
> Regards,
>
> -Yogesh
>


Running Into Error When Installing Apache Flink Using Python

2023-06-12 Thread Joseph, Chris S
Hi,

I tried to install apache-flink python package but getting an error message, 
and I am unsure how to resolve this, since I am able to install other packages 
like pandas without any issues. Any help would be greatly appreciated.

vagrant@devbox:~$ pip install apache-flink
Defaulting to user installation because normal site-packages is not writeable
Collecting apache-flink
  Using cached apache_flink-1.17.1-cp310-cp310-manylinux1_x86_64.whl (5.7 MB)
Collecting requests>=2.26.0
  Using cached requests-2.31.0-py3-none-any.whl (62 kB)
Collecting apache-beam==2.43.0
  Using cached 
apache_beam-2.43.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl 
(14.0 MB)
Collecting pemja==0.3.0
  Using cached pemja-0.3.0-cp310-cp310-manylinux1_x86_64.whl (332 kB)
Collecting apache-flink-libraries<1.17.2,>=1.17.1
  Downloading apache-flink-libraries-1.17.1.tar.gz (240.2 MB)
 ━━━╸ 240.2/240.2 MB 19.0 MB/s eta 
0:00:01
ERROR: Exception:
Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/pip/_vendor/urllib3/response.py", line 
438, in _error_catcher
yield
  File "/usr/lib/python3/dist-packages/pip/_vendor/urllib3/response.py", line 
519, in read
data = self._fp.read(amt) if not fp_closed else b""
  File 
"/usr/lib/python3/dist-packages/pip/_vendor/cachecontrol/filewrapper.py", line 
90, in read
data = self.__fp.read(amt)
  File "/usr/lib/python3.10/http/client.py", line 465, in read
s = self.fp.read(amt)
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
return self._sock.recv_into(b)
  File "/usr/lib/python3.10/ssl.py", line 1274, in recv_into
return self.read(nbytes, buffer)
  File "/usr/lib/python3.10/ssl.py", line 1130, in read
return self._sslobj.read(len, buffer)
TimeoutError: The read operation timed out

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/pip/_internal/cli/base_command.py", line 
165, in exc_logging_wrapper
status = run_func(*args)
  File "/usr/lib/python3/dist-packages/pip/_internal/cli/req_command.py", line 
205, in wrapper
return func(self, options, args)
  File "/usr/lib/python3/dist-packages/pip/_internal/commands/install.py", line 
339, in run
requirement_set = resolver.resolve(
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/resolver.py",
 line 94, in resolve
result = self._result = resolver.resolve(
  File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", 
line 481, in resolve
state = resolution.resolve(requirements, max_rounds=max_rounds)
  File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", 
line 373, in resolve
failure_causes = self._attempt_to_pin_criterion(name)
  File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", 
line 213, in _attempt_to_pin_criterion
criteria = self._get_updated_criteria(candidate)
  File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", 
line 204, in _get_updated_criteria
self._add_to_criteria(criteria, requirement, parent=candidate)
  File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/resolvers.py", 
line 172, in _add_to_criteria
if not criterion.candidates:
  File "/usr/lib/python3/dist-packages/pip/_vendor/resolvelib/structs.py", line 
151, in __bool__
return bool(self._sequence)
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/found_candidates.py",
 line 155, in __bool__
return any(self)
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/found_candidates.py",
 line 143, in 
return (c for c in iterator if id(c) not in self._incompatible_ids)
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/found_candidates.py",
 line 47, in _iter_built
candidate = func()
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/factory.py",
 line 215, in _make_candidate_from_link
self._link_candidate_cache[link] = LinkCandidate(
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py",
 line 288, in __init__
super().__init__(
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py",
 line 158, in __init__
self.dist = self._prepare()
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py",
 line 227, in _prepare
dist = self._prepare_distribution()
  File 
"/usr/lib/python3/dist-packages/pip/_internal/resolution/resolvelib/candidates.py",
 line 299, in _prepare_distribution
return preparer.prepare_linked_requirement(self._ireq, parallel_builds=True)
  File "/usr/lib/python3/dist-packages/pip/_internal/operations/prepare.py", 
line 487, in prepare_linked_requirement
return self._prepare_linked_requirement(req, parallel_builds)
  File 

Re: Async IO operator to write to DB

2023-06-12 Thread Martijn Visser
Hi Karthik,

In my opinion, it makes more sense to use a sink to leverage Scylla over
using Async IO. The primary use case for Async IO is enrichment, not for
writing to a sync.

Best regards,

Martijn

On Mon, Jun 12, 2023 at 4:10 PM Karthik Deivasigamani 
wrote:

> Thanks Martijn for your response.
> One thing I did not mention was that we are in the process of moving away
> from Cassandra to Scylla and would like to use the Scylla Java Driver for
> the following reason :
>
>> The Scylla Java driver is shard aware and contains extensions for a
>> tokenAwareHostPolicy. Using this policy, the driver can select a
>> connection to a particular shard based on the shard’s token. As a result,
>> latency is significantly reduced because there is no need to pass data
>> between the shards.
>>
> We were considering writing our own Sink to leverage Scylla Java Driver
> once the migration is done.
> ~
> Karthik
>
>
> On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser 
> wrote:
>
>> Hi,
>>
>> Why wouldn't you just use the Flink Kafka connector and the Flink
>> Cassandra connector for your use case?
>>
>> Best regards,
>>
>> Martijn
>>
>> On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani <
>> karthi...@gmail.com> wrote:
>>
>>> Hi,
>>>I have a use case where I need to read messages from a Kafka topic,
>>> parse it and write it to a database (Cassandra). Since Cassandra supports
>>> async APIs I was considering using Async IO operator for my writes. I do
>>> not need exactly-once semantics for my use-case.
>>> Is it okay to leverage the Async IO operator as a Sink (writing data
>>> into a DB)?
>>> ~
>>> Karthik
>>>
>>


Re: Async IO operator to write to DB

2023-06-12 Thread Karthik Deivasigamani
Thanks Martijn for your response.
One thing I did not mention was that we are in the process of moving away
from Cassandra to Scylla and would like to use the Scylla Java Driver for
the following reason :

> The Scylla Java driver is shard aware and contains extensions for a
> tokenAwareHostPolicy. Using this policy, the driver can select a
> connection to a particular shard based on the shard’s token. As a result,
> latency is significantly reduced because there is no need to pass data
> between the shards.
>
We were considering writing our own Sink to leverage Scylla Java Driver
once the migration is done.
~
Karthik


On Mon, Jun 12, 2023 at 4:56 PM Martijn Visser 
wrote:

> Hi,
>
> Why wouldn't you just use the Flink Kafka connector and the Flink
> Cassandra connector for your use case?
>
> Best regards,
>
> Martijn
>
> On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani <
> karthi...@gmail.com> wrote:
>
>> Hi,
>>I have a use case where I need to read messages from a Kafka topic,
>> parse it and write it to a database (Cassandra). Since Cassandra supports
>> async APIs I was considering using Async IO operator for my writes. I do
>> not need exactly-once semantics for my use-case.
>> Is it okay to leverage the Async IO operator as a Sink (writing data into
>> a DB)?
>> ~
>> Karthik
>>
>


Building Dynamic SQL using contents of Map datastructure

2023-06-12 Thread Yogesh Rao
Hi,

Is there a way we can build a dynamic SQL in Flink from contents of Map ?

Essentially trying to do achieve something like below

StringBuilder builder = new StringBuilder("INSERT INTO sampleSink SELECT ");

builder.append("getColumnsFromMap(dataMap), ");

builder.append(" FROM Data").toString();


Here getColumnsFromMap is registered as ScalarFunction which would return
array of Strings basically column names.


Regards,

-Yogesh


Re: Flink source error handling

2023-06-12 Thread Martijn Visser
1. Like a regular Kafka client, so it depends on how you configure it.
2. Yes
3. It depends on the failure of course, but you could create something like
a Dead Letter Queue in case deserialization fails for your incoming message.

Best regards,

Martijn

On Sat, Jun 10, 2023 at 2:03 PM Anirban Dutta Gupta <
anir...@indicussoftware.com> wrote:

> Hello,
>
> Thanks for the guidance. We will surely think of moving to a newer version
> of Flink.
> Just a few followup questions when using KafkaSource..(sorry if I am being
> naive in my questions)
>
> 1. How does KafkaSource handle disruptions with the Kafka broker ? Will it
> keep on trying to connect and subscribe to the broker indefinitely or will
> it fail the Flink source after a certain number of retries ?
>
> 2. Will there be some log output in the Flink logs while the KafkaSource
> is trying again to connect to the broker after a disruption ?
>
> 3. In case of the source failing, is there a way in the Flink program
> using the KafkaSource to detect the error and add some error handling
> mechanism..for e.g. sending an alert mail to the stakeholders in case the
> source fails completely. (Something similar to
> "ActionRequestFailureHandler" for ElasticsearchSink)
>
> Many thanks in advance,
> Anirban
>
> On 09-06-2023 20:01, Martijn Visser wrote:
>
> Hi,
>
> This consumer should not be used. This only occurs in really old and no
> longer supported Flink versions. You should really upgrade to a newer
> version of Flink and use the KafkaSource.
>
> Best regards,
>
> Martijn
>
> On Fri, Jun 9, 2023 at 11:05 AM Anirban Dutta Gupta <
> anir...@indicussoftware.com> wrote:
>
>> Hello,
>>
>> We are using "FlinkKafkaConsumer011" as a Kafka source consumer for
>> Flink. Please guide on how to implement error handling mechanism for the
>> following:
>> 1. If the subscription to the Kafka topic gets lost, Kafka connection
>> gets disconnected.
>> In this case, is there any mechanism of re-subscribing to the Kafka
>> topic automatically in the program.
>>
>> 2. If there is any error in the FetchRecords of the consumer.
>>
>> Thanks and Regards,
>> Anirban
>>
>
>


Re: Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Gyula Fóra
Hi!

I think you forgot to upgrade the operator CRD (which contains the updates
enum values).

Please see:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd

Cheers
Gyula

On Mon, 12 Jun 2023 at 13:38, Liting Liu (litiliu) 
wrote:

> Hi,  I was trying to submit a flink 1.17 job with the
> flink-kubernetes-operator version v1.5.0.
> But encountered the below exception:
>
>
> The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion:
> Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15",
> "v1_16"
>
>
> I think the flink-operator should have supported flink 1.17, because as
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
>  requires,
>  autoscaler only work well with flink 1.17.
>
>
>
>


Re: Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Gyula Fóra
Hi!

I think you forgot to upgrade the operator CRD (which contains the updates
enum values).

Please see:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/#1-upgrading-the-crd

Cheers
Gyula

On Mon, 12 Jun 2023 at 13:38, Liting Liu (litiliu) 
wrote:

> Hi,  I was trying to submit a flink 1.17 job with the
> flink-kubernetes-operator version v1.5.0.
> But encountered the below exception:
>
>
> The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion:
> Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15",
> "v1_16"
>
>
> I think the flink-operator should have supported flink 1.17, because as
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
>  requires,
>  autoscaler only work well with flink 1.17.
>
>
>
>


Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Liting Liu (litiliu)
Hi,  I was trying to submit a flink 1.17 job with the flink-kubernetes-operator 
version v1.5.0.
But encountered the below exception:


The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: 
Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16"


I think the flink-operator should have supported flink 1.17, because as 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
 requires,   autoscaler only work well with flink 1.17.





Fail to run flink 1.17 job with flink-operator 1.5.0 version

2023-06-12 Thread Liting Liu (litiliu)
Hi,  I was trying to submit a flink 1.17 job with the flink-kubernetes-operator 
version v1.5.0.
But encountered the below exception:


The FlinkDeployment "test-scale-z6t4cd" is invalid: spec.flinkVersion: 
Unsupported value: "v1_17": supported values: "v1_13", "v1_14", "v1_15", "v1_16"


I think the flink-operator should have supported flink 1.17, because as 
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.5/docs/custom-resource/autoscaler/
 requires,   autoscaler only work well with flink 1.17.





Re: Async IO operator to write to DB

2023-06-12 Thread Martijn Visser
Hi,

Why wouldn't you just use the Flink Kafka connector and the Flink Cassandra
connector for your use case?

Best regards,

Martijn

On Mon, Jun 12, 2023 at 12:03 PM Karthik Deivasigamani 
wrote:

> Hi,
>I have a use case where I need to read messages from a Kafka topic,
> parse it and write it to a database (Cassandra). Since Cassandra supports
> async APIs I was considering using Async IO operator for my writes. I do
> not need exactly-once semantics for my use-case.
> Is it okay to leverage the Async IO operator as a Sink (writing data into
> a DB)?
> ~
> Karthik
>


Async IO operator to write to DB

2023-06-12 Thread Karthik Deivasigamani
Hi,
   I have a use case where I need to read messages from a Kafka topic,
parse it and write it to a database (Cassandra). Since Cassandra supports
async APIs I was considering using Async IO operator for my writes. I do
not need exactly-once semantics for my use-case.
Is it okay to leverage the Async IO operator as a Sink (writing data into a
DB)?
~
Karthik


Re: 求flink作业各个算子的延迟指标

2023-06-12 Thread Hangxiang Yu
[.[.]]..latency
这个应该可以满足需求?也可以设置不同的粒度。
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/metrics/#io

On Mon, Jun 12, 2023 at 5:05 PM casel.chen  wrote:

> 想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?



-- 
Best,
Hangxiang.


求flink作业各个算子的延迟指标

2023-06-12 Thread casel.chen
想统计数据经过flink作业各个算子的延迟指标,目前社区开源版能实现吗?

Re: Unsubscribe

2023-06-12 Thread Hang Ruan
Please send an email to user-unsubscr...@flink.apache.org to unsubscribe

Best,
Hang

Yu voidy  于2023年6月12日周一 11:39写道:

>
>


Re:回复:flink作业延迟时效指标

2023-06-12 Thread casel.chen









我的是flink sql作业,要如何实现你说的方案呢?我看到阿里云实时计算平台VVR是支持展示作业时延指标的,想知道它是如何实现的








在 2023-06-08 16:46:34,"17610775726" <17610775...@163.com> 写道:
>Hi
>
>
>你提到的所有的监控都是可以通过 metric 来监控报警的,至于你提到的 LatencyMarker 因为它不参与算子内部的计算逻辑的时间,所以这个 
>metric 并不是准确的,但是如果任务有反压的情况下 LatencyMarker 
>也会被阻塞,所以大体上还是可以反应出任务的延迟情况,如果想要准确的计算出端到端的延迟,可以在 消费 kafka 的时候获取一个 start time 时间戳 
>在 sink 的时候获取一个 end time 时间戳,然后自定义一个 metric 把这个结果上报 基于这个 metric 做端到端的延迟监控。
>
>
>Best
>JasonLee
>
>
> 回复的原邮件 
>| 发件人 | casel.chen |
>| 发送日期 | 2023年06月8日 16:39 |
>| 收件人 | user-zh@flink.apache.org |
>| 主题 | flink作业延迟时效指标 |
>我想知道当前flink作业延迟了多久现在能通过什么指标可以获取到吗?想通过设置作业延迟告警来反馈作业健康状况,是否产生背压,是否需要增加资源等。
>以mysql表实时同步到doris表为例:mysql binlog -> kafka -> flink -> doris
>延迟指标包括:
>1. 业务延迟:业务延迟=当前系统时间 - 当前系统处理的最后一条数据的事件时间(Event time)
>例如:kafka消息写入doris的时间 - kafka消息数据本身产生时间(例如更新mysql记录的时间)
>2. 数据滞留延迟:数据滞留时间=数据进入实时计算的时间 - 数据事件时间(Event time)
>例如:flink消费到kafka消息时间 - 消费到的kafka消息数据本身产生时间(例如更新mysql记录的时间)
>
>
>当前我们是用kafka消费组积压告警来替代的,但这个数据不准,一是因为flink 
>checkpoint才会更新offset,二是因为生产流量在不同时段是不同的,在流量低的时候告警不及时。
>查了官网有一个LatencyMarker可以开启使用,请问这个开启后要怎么观察延迟呢?这个metric需要上报到prometheus才可以读到吗?
>
>
>我们遇到另一个问题是使用flink 
>sql提交作业生成的metric名称很长,因为operatorId是根据sql内容来生成的,所以动不动就把prometheus给打爆了,这个有什么办法解决么?