Flink Metric Naming And Reporting Confusion

2023-02-17 Thread Saad Mufti
Hi,

My team just started coding a new Flink app to be deployed under AWS EMR.
We are a little confused by the metric naming for built in metrics (not
ones we create ourselves) and reporting of metrics via StatsD.

We have added configuration to flink-conf.yaml to configure a StatsD
reporter. This is reporting a proprietary back end of our own. What we have
observed is we're getting metrics of the form:

ip-10-76-10-112_ec2_internal.taskmanager.container_1676482557753_0002_01_06.Status.JVM.CPU.Load

Going through the documentation (
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-scope)
that IP address and task id are added by the scope for built in system
metrics. But we can't understand how this is useful, to give a totally
different metric name to every task's metric? Why doesn't it instead report
all these with the name "Status.JVM.CPU.Load" and add the IP address and
task id as tags? What's the design motivation for changing every metric
name as opposed to tags?

Also we're getting a few of the metrics listed at
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#system-metrics
in our back end, but none of the application/operator level metrics that
are supposedly also maintained by the framework. We CAN see those directly
in the Flink dashboard. So why aren't those also being reported via the
StatsD reporter?

Any help or insight would be most appreciated.

Thanks.


Saad


Re: KafkaSink handling message size produce errors

2023-02-17 Thread Jing Ge via user
ticket created: https://issues.apache.org/jira/browse/FLINK-31121

On Fri, Feb 17, 2023 at 9:59 AM Hatem Mostafa  wrote:

> Thanks for adding this to your backlog, I think it's definitely a very
> useful feature.
>
> Can you provide an example for how to extend KafkaSink to
> add this error handling? I have tried to do so but did not find it straight
> forward, since errors are thrown in the deliveryCallback of KafkaWriter and
> KafkaSink is not extendable since all its members are private and the
> constructor is package private.
>
> On Fri, Feb 17, 2023 at 8:17 AM Shammon FY  wrote:
>
>> Hi jing,
>>
>> It sounds good to me, we can add an option for it
>>
>> Best,
>> Shammon
>>
>>
>> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge  wrote:
>>
>>> Hi,
>>>
>>> It makes sense to offer this feature of catching and ignoring exp with
>>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
>>> ticket if most of you consider it as a good feature to help users.
>>>
>>> Best regards,
>>> Jing
>>>
>>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:
>>>
 Hi Hatem

 As mentioned above, you can extend the KafkaSink or create a udf and
 process the record before sink

 Best,
 Shammon

 On Fri, Feb 17, 2023 at 9:54 AM yuxia 
 wrote:

> Hi, Hatem.
> I think there is no way to catch the exception and then ignore it in
> current implementation for KafkaSink.  You may also need to extend the
> KafkaSink.
>
> Best regards,
> Yuxia
>
> --
> *发件人: *"Hatem Mostafa" 
> *收件人: *"User" 
> *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
> *主题: *KafkaSink handling message size produce errors
>
> Hello,
> I am writing a flink job that reads and writes into kafka, it is using
> a window operator and eventually writing the result of the window into a
> kafka topic. The accumulated data can exceed the maximum message size 
> after
> compression on the producer level. I want to be able to catch the 
> exception
> coming from the producer and ignore this window. I could not find a way to
> do that in KafkaSink
> ,
> is there a way to do so?
>
> I attached here an example of an error that I would like to handle
> gracefully.
>
> [image: image.png]
>
>
> This question is similar to one that was asked on stackoverflow here
> 
>  but
> the answer is relevant for older versions of flink.
>
> Regards,
> Hatem
>
>


Flink1.16写入kafka 报错:Cluster authorization failed.

2023-02-17 Thread lxk
Flink版本:1.16
目前公司针对Flink版本进行升级,从Flink1.14升级到Flink1.16,代码没做任何调整,但是在写入kafka的时候报错:
2023-02-17 15:03:19
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1125)
at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:442)
at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:998)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:912)
at org.apache.flink.connector.kafka.sink.KafkaWriter.write(KafkaWriter.java:197)
at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:160)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.ClusterAuthorizationException: 
Cluster authorization failed.


在了解了相关源码之后,知道KafkaSink这种新的kafka 
api在实现精准一次的时候,分为了两个阶段,一个是writer,一个是commiter,其中在writer中维护了一个producerpool,因此需要权限创建producer,这块能理解。
但是在使用老的kafka api,即FlinkKafkaProducer时,只需要维护一个Producer。不明白为啥在使用老的api的时候还是会报同样的错误。


或者我说的原因不是这个报错的根本原因,希望大家能帮忙解答下

Re: Flink程序内存Dump不了

2023-02-17 Thread Guo Thompson
可能是jm 和 tm之间的心跳时间太短了, dump的过程会stop the world,tm就不响应jm的heartbeat了;

lxk  于2023年2月14日周二 14:32写道:

> Flink version:1.16
> java version: jdk1.8.0_251
> 问题:最近上线的Flink程序,频繁young
> gc,几秒一次,在调整了新生代大小之后,还是没有解决,目前整个jvm堆大小是3.57g。因此想通过程序内存情况来分析哪里问题有问题,我们通过yarn上的applicationId,使用ps
> -ef|grep 1666758697316_2639108,找到对应的pid,最后执行 jmap -dump:format
> b,file=user.dump 26326
> 命令生成dump文件,但我们测试了很多个程序,只要一开始dump,都会对线上程序产生影响,程序的container会莫名的死掉,然后程序重启。具体执行命令后的报错如下:
> sun.jvm.hotspot.debugger.UnmappedAddressException: 7f74efa5d410
> https://pic.imgdb.cn/item/63eb2a46f144a010071899ba.png
> 不知道大家有没有遇见这个问题,是我们使用的姿势不对,还是目前使用的版本有什么问题,希望大家能够给出一些建议和看法。


Re: KafkaSink handling message size produce errors

2023-02-17 Thread Hatem Mostafa
Thanks for adding this to your backlog, I think it's definitely a very
useful feature.

Can you provide an example for how to extend KafkaSink to
add this error handling? I have tried to do so but did not find it straight
forward, since errors are thrown in the deliveryCallback of KafkaWriter and
KafkaSink is not extendable since all its members are private and the
constructor is package private.

On Fri, Feb 17, 2023 at 8:17 AM Shammon FY  wrote:

> Hi jing,
>
> It sounds good to me, we can add an option for it
>
> Best,
> Shammon
>
>
> On Fri, Feb 17, 2023 at 3:13 PM Jing Ge  wrote:
>
>> Hi,
>>
>> It makes sense to offer this feature of catching and ignoring exp with
>> config on/off, when we put ourselves in users' shoes. WDYT? I will create a
>> ticket if most of you consider it as a good feature to help users.
>>
>> Best regards,
>> Jing
>>
>> On Fri, Feb 17, 2023 at 6:01 AM Shammon FY  wrote:
>>
>>> Hi Hatem
>>>
>>> As mentioned above, you can extend the KafkaSink or create a udf and
>>> process the record before sink
>>>
>>> Best,
>>> Shammon
>>>
>>> On Fri, Feb 17, 2023 at 9:54 AM yuxia 
>>> wrote:
>>>
 Hi, Hatem.
 I think there is no way to catch the exception and then ignore it in
 current implementation for KafkaSink.  You may also need to extend the
 KafkaSink.

 Best regards,
 Yuxia

 --
 *发件人: *"Hatem Mostafa" 
 *收件人: *"User" 
 *发送时间: *星期四, 2023年 2 月 16日 下午 9:32:44
 *主题: *KafkaSink handling message size produce errors

 Hello,
 I am writing a flink job that reads and writes into kafka, it is using
 a window operator and eventually writing the result of the window into a
 kafka topic. The accumulated data can exceed the maximum message size after
 compression on the producer level. I want to be able to catch the exception
 coming from the producer and ignore this window. I could not find a way to
 do that in KafkaSink
 ,
 is there a way to do so?

 I attached here an example of an error that I would like to handle
 gracefully.

 [image: image.png]


 This question is similar to one that was asked on stackoverflow here
 
  but
 the answer is relevant for older versions of flink.

 Regards,
 Hatem