flink-connector-jdbc是否支持多个values问题

2022-03-06 Thread payne_z
请问flink-connector-jdbc是否支持同时写入多个values的用法?

Re:Re:Re:回复:FlinkKafkaProducer 问题

2022-03-06 Thread 潘明文
HI,


 flink 还是报以下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


代码如下:
Properties prop = new Properties();
prop.setProperty("bootstrap.servers", KAFKA_ADDR);
prop.setProperty("acks", "all");
//设置producer 幂等性 保证producer 数据写入到broker 不重复
prop.setProperty("enable.idempotence", "true");
// 设置FlinkKafkaProducer里面的事务超时时间,默认broker的最大事务超时时间为15分钟
prop.setProperty("transaction.timeout.ms", transaction + "");
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my_tx_id");
prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);


FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(
WRITE_TOPIC,
serializationSchema,
prop,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); 














At 2022-03-07 10:06:45, "潘明文"  wrote:

目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文"  写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan"  写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan"  写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>





 

Re: Require help regarding possible issue/bug I'm facing while using Flink

2022-03-06 Thread Qingsheng Ren
Hi De Xun, 

I created an answer in the StackOverflow and hope it would be helpful. I’d like 
repost my answer here for the convenience of people in mailing lists.

The first call of RowRowConverter::toInternal is an internal implementation for 
making a deep copy of the StreamRecord emitted by table source, which is 
independent from the converter in your map function. 

The reason of the NPE is that the RowRowConverter in the map function is not 
initialized by calling RowRowConverter::open. You can use RichMapFunction 
instead to invoke the RowRowConverter::open in RichMapFunction::open.

Best regards,

Qingsheng Ren

> On Mar 7, 2022, at 09:16, Chia De Xun .  wrote:
> 
> Greetings,
> 
> I'm facing a difficult issue/bug while working with Flink. Would definitely 
> appreciate some official expert help on this issue. I have posted my problem 
> on StackOverflow, but have no replies at the moment. 
> 
> Let me know if you have any questions/clarifications for me! It would be best 
> appreciated.
> 
> Best Regards,
> De Xun



Re: PyFlink : submission via rest

2022-03-06 Thread Dian Fu
The dedicated REST API is still not supported. However, you could try to
use PythonDriver just like you said and just submit it like a Java Flink
job.

Regards,
Dian

On Sun, Mar 6, 2022 at 3:38 AM aryan m  wrote:

> Thanks Zhilong for taking a look!
>
> Primarily I am looking for ways to start it through a REST api [1]   . For
> Java, I pass along entry-class pointing to a main class in the jar which
> constructs the job graph and triggers the execute(). How do we accomplish
> this for pyflink jobs?  The closest I encountered is PythonDriver [2]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
> [2]
> https://github.com/apache/flink/blob/release-1.13/flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java
>
>
> On Sat, Mar 5, 2022 at 10:37 AM Zhilong Hong  wrote:
>
>> Hi, Aryan:
>>
>> You could refer to the official docs [1] for how to submit PyFlink jobs.
>>
>> $ ./bin/flink run \
>>   --target yarn-per-job
>>   --python examples/python/table/word_count.py
>>
>> With this command you can submit a per-job application to YARN. The docs
>> [2] and [3] describe how to submit jobs to the YARN session and the
>> Kubernetes session.
>>
>> $ ./bin/flink run -t yarn-session \
>>   -Dyarn.application.id=application__YY \
>>   --python examples/python/table/word_count.py
>>
>> $ ./bin/flink run -t kubernetes-session \
>>   -Dkubernetes.cluster-id=my-first-flink-cluster \
>>   --python examples/python/table/word_count.py
>>
>> Best,
>> Zhilong
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#submitting-pyflink-jobs
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/yarn/#session-mode
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-master/zh/docs/deployment/resource-providers/native_kubernetes/#session-mode
>>
>> On Sun, Mar 6, 2022 at 2:08 AM aryan m  wrote:
>>
>>> Hi !
>>>In a session cluster, what is the recommended way to submit a pyFlink
>>> job via REST ? I am on Flink 1.13 and my job code is available at
>>> web.upload.dir
>>> 
>>>  .
>>>
>>>   Appreciate the help!
>>>
>>>
>>>


Re: Pyflink1.13 or JavaFlink1.13 + Jpython + Python2.7, which way has better performance?

2022-03-06 Thread Dian Fu
Hi Vtygoss,

>> As far as i know, the python APIs only provide a subset of about 2/3 of
what's available in Java APIs; the performance of PyFlink is worse than
JavaFlink and some features contributed after 1.10 are not implemented in
PyFlink yet.
There are two levels of API in Flink: Table API and DataStream API.
Regarding Table API, AFAIK, it has aligned most of the functionalities
provided in the Java Table API. Regarding DataStream API, there are still
several features not aligned, e.g. side output, broadcast state, join, etc.
However, I guess that most commonly used features should have already been
supported. Do you have a clear understanding of the features you want to
use?  If so, we could evaluate if there are problems.

Besides, regarding "some features contributed after 1.10 are not
implemented in PyFlink yet", usually we are trying to avoid this as much as
possible. Could you share the missing features in your mind? It could help
us to improve this.

>> And python code can be compiled to java bytecode by ASM carrier and
loaded into JVM, so can i argue that the python code is not much less
efficient than java code?
For the JPython solution, there are a few known limitations. If it's not
much of a problem for you, I think you could give it a try.

Regards,
Dian


On Fri, Mar 4, 2022 at 2:36 PM vtygoss  wrote:

> Hi, community!
>
>
> I am working on data processing structure optimization from full data
> pipeline to incremental data pipeline, from PySpark with PythonCode to two
> optional ways below:
>
>
> 1. PyFlink 1.13 + Python 2.7
>
> 2. JavaFlink 1.13 + JPython + Python 2.7
>
>
> As far as i know, the python APIs only provide a subset of about 2/3 of
> what's available in Java APIs; the performance of PyFlink is worse than
> JavaFlink and some features contributed after 1.10 are not implemented in
> PyFlink yet.
>
>
> And python code can be compiled to java bytecode by ASM carrier and loaded
> into JVM, so can i argue that the python code is not much less efficient
> than java code?
>
>
> So i prefer the second way.
>
> Thanks for any suggestions or replies.
>
>
> Best Regards!
>


退订

2022-03-06 Thread 朱福生
退订



发自我的iPhone

??????Re: flink??????????????

2022-03-06 Thread ????
??




----
??: 
   "user-zh"



?????? flink??????????????

2022-03-06 Thread ????
??




----
??: 
   "user-zh"



Re:Re: flink任务经常性报错

2022-03-06 Thread 潘明文



看日志:
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 
'cdh04/192.168.0.12:45843'. This might indicate that the remote task manager 
was lost.
at 
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:160)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
at 
org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:94)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:818)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)














在 2022-03-07 11:01:22,"yue ma"  写道:
>这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM
>的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。
>
>潘明文  于2022年3月7日周一 10:21写道:
>
>> HI 读kafka,入hbase和kafka
>> flink任务经常性报错
>>
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
>> This might indicate that the remote task manager was lost.


Re: flink任务经常性报错

2022-03-06 Thread yue ma
这个报错的意思是有 TM 断开了连接,我觉得可以首先看看你们 'cdh02/xxx:42892' 这个丢失的 TM
的日志上有没有什么异常信息,如果没有的话也可以看看对应的机器监控有没有异常。

潘明文  于2022年3月7日周一 10:21写道:

> HI 读kafka,入hbase和kafka
> flink任务经常性报错
>
> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
> Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'.
> This might indicate that the remote task manager was lost.


Re: Submit job to a session cluster on Kubernetes via REST API

2022-03-06 Thread Yang Wang
If you want to use the RestClusterClient to do the job submission and
lifecycle management, the implementation in the
flink-kubernetes-operator[1] project may give you some insights.

You could also use /jars/:jarid/run[2] to run a Flink job. It is a pure
HTTP interface.


[1].
https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java#L126
[2].
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run

Best,
Yang

Almog Rozencwajg  于2022年3月6日周日 15:29写道:

> Hi,
>
>
>
> We deploy a Flink session cluster on Kubernetes.
>
> We want to submit jobs via java application using the REST API
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/rest_api/
> .
>
> I'm trying to use the RestClusterClient which comes with the flink-clients
> module but I couldn't find any examples in the documentation.
>
> What is the correct way to submit a job to a native Kubernetes session
> cluster using the REST API, work with the RestClusterClient or can we use
> any other REST client?
>
> Is there an example of how to work with the RestClusterClient?
>
>
>
> Thanks,
>
> Almog
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


flink任务经常性报错

2022-03-06 Thread 潘明文
HI 读kafka,入hbase和kafka
flink任务经常性报错

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This 
might indicate that the remote task manager was lost.

io.network.netty.exception

2022-03-06 Thread 潘明文
HI 读kafka,入hbase和kafka
flink任务经常性报错

org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: 
Connection unexpectedly closed by remote task manager 'cdh02/xxx:42892'. This 
might indicate that the remote task manager was lost.

Re:Re:回复:FlinkKafkaProducer 问题

2022-03-06 Thread 潘明文
目前出现下错误:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: Producer attempted an operation with an old epoch. Either there 
is a newer producer with the same transactionalId, or the producer's 
transaction has been expired by the broker


















在 2022-01-21 15:15:51,"潘明文"  写道:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>HI ,
>   好的,我准备测试以下,目前flink 版本是FLINK-1.12.4,kafka-connector 
> flink-connector-kafka_2.11
>
>在 2022-01-21 14:36:06,"selves_nan"  写道:
>>Hi,这个事务id自己指定即可,如果指定了之后还是报错,方便给下用到的flink和kafka-connector版本吗,目前在使用的版本没有看到相关的api
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月21日 13:00,潘明文 写道:
>>HI,
>>"生产者的事务id"  怎么获取呀?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-01-21 10:41:37,"selves_nan"  写道:
>>Hi,我觉得应该是prop缺失了kafka事务型生产者的一些配置项导致的,可以尝试一下加入下面的配置项。
>>prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"生产者的事务id");
>>//开启幂等性
>>prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
>>
>>
>>| |
>>selves_nan
>>|
>>|
>>selves_...@163.com
>>|
>>签名由网易邮箱大师定制
>>
>>
>>在2022年01月20日 14:39,潘明文 写道:
>>hi,
>>我创建FlinkKafkaProducer 是,运行时有时出现以下错误,不知道啥原因。
>>
>>FlinkKafkaProducer kafkaSink = new FlinkKafkaProducer<>(WRITE_TOPIC, 
>>new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), prop, 
>>FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>>
>>
>>org.apache.kafka.common.KafkaException: Unhandled error in EndTxnResponse: 
>>The producer attempted to use a producer id which is not currently assigned 
>>to its transactional id.
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$EndTxnHandler.handleResponse(TransactionManager.java:1362)
>>at 
>>org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
>>at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>>at 
>>org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
>>at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
>>at 
>>org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
>>at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
>>at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
>>at java.lang.Thread.run(Thread.java:748)
>>Suppressed: java.lang.NullPointerException
>>


Re: How to sort Iterable in ProcessWindowFunction?

2022-03-06 Thread yidan zhao
Collect the elements to a list, then sort, then collect out.

HG  于2022年3月3日周四 22:13写道:

>   Hi,
> I have need to sort the input of the ProcesWindowFunction by one of the
> fields of the Tuple4 that is in the Iterator.
>
> Any advice as to what the best way is?
>
>  static class MyProcessWindowFunction extends
> ProcessWindowFunction, String, String,
> TimeWindow> {
> @Override
> public void process(String key, Context context,
> Iterable> input, Collector out)
> {
> Long elapsed   = 0L;
> Long pHandlingTime = 0L;
> Long totalElapsed  = 0L
>
> System.out.println(input.getClass());
>
> Iterator> etter =
> input.iterator();
> *for (Tuple4 in: input){*
> transactionId = in.getField(2).toString();
> elapsed   = Long.parseLong(in.getField(1).toString())
> - pHandlingTime;
> totalElapsed  = totalElapsed + elapsed;
> pHandlingTime = Long.parseLong(in.getField(1).toString())
>
> out.collect("Key : " + key + " Window : " +
> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
> elapsed.toString() + "  max handling time : " + h.toString() + "
> totalElapsed " + totalElapsed);
> }
> }
> }
>
>
> Op do 3 mrt. 2022 om 15:12 schreef HG :
>
>> Hi,
>> I have need to sort the input of the ProcesWindowFunction by one of the
>> fields of the Tuple4 that is in the Iterator.
>>
>>  static class MyProcessWindowFunction extends
>> ProcessWindowFunction, String, String,
>> TimeWindow> {
>> @Override
>> public void process(String key, Context context,
>> Iterable> input, Collector out)
>> {
>> Long elapsed   = 0L;
>> Long pHandlingTime = 0L;
>> Long totalElapsed  = 0L
>>
>> System.out.println(input.getClass());
>>
>> Iterator> etter =
>> input.iterator();
>> *for (Tuple4 in: input){*
>> transactionId = in.getField(2).toString();
>> elapsed   = Long.parseLong(in.getField(1).toString())
>> - pHandlingTime;
>> totalElapsed  = totalElapsed + elapsed;
>> pHandlingTime = Long.parseLong(in.getField(1).toString())
>>
>> out.collect("Key : " + key + " Window : " +
>> context.window() + "  transactionId : "  + transactionId + "  elapsed : " +
>> elapsed.toString() + "  max handling time : " + h.toString() + "
>> totalElapsed " + totalElapsed);
>> }
>> }
>> }
>>
>


Require help regarding possible issue/bug I'm facing while using Flink

2022-03-06 Thread Chia De Xun .
Greetings,

I'm facing a difficult issue/bug while working with Flink. Would definitely
appreciate some official expert help on this issue. I have posted my
problem on StackOverflow
,
but have no replies at the moment.

Let me know if you have any questions/clarifications for me! It would be
best appreciated.

Best Regards,
De Xun


Re: Question about Flink counters

2022-03-06 Thread Shane Bishop
Hi Zhanghao Chen,

Sure, I can give some context.

My team's Flink application runs as a Kinesis Data Analytics streaming 
application [1] in AWS.

Our application receives events from Amazon Simple Queue Service (SQS) [2] in 
our source, and then uses a property of the SQS event to download from Amazon 
S3 [3]. The external metrics system for our counters is Amazon CloudWatch 
metrics [4].

For both the SQS consumer source and our S3 downloader operator, we have a 
counter for number of received items, number of successfully processed items, 
and number of items that failed to process.

However, during testing we have found that the count for SQS events received 
and S3 downloads is much too high. The counts for our counters in CloudWatch is 
much higher than the number of records reported in the Flink dashboard.

The goal is that our metrics in CloudWatch should accurately reflect the number 
of SQS events received and successfully or unsuccessfully processed, and the 
number of S3 downloads that were attempted and succeeded or failed.

I am looking for help understanding why our counter values are inaccurate.

[1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html
[2] 
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
[3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html
[4] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html


From: Zhanghao Chen 
Sent: March 5, 2022 11:11 PM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Question about Flink counters

Hi Shane,

Could you share more information on what you would like to use the counter for?

The counter discussed here is primarily designed for exposing counts to 
external metric systems. Usually, each task would count on its own, and it is 
left for the external metric system (usu. a time series database) to do 
aggregations. Also, you cannot reference a counter from a different machine. 
I'm not sure if this is what you expected.

Best,
Zhanghao Chen

From: Shane Bishop 
Sent: Saturday, March 5, 2022 23:22
To: Zhanghao Chen ; user@flink.apache.org 

Subject: Re: Question about Flink counters

If I used a thread-safe counter implementation, would that be enough to make 
the count correct for a Flink cluster with multiple machines?

Best,
Shane

From: Zhanghao Chen 
Sent: March 4, 2022 11:08 PM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Question about Flink counters

Hi Shane,

Flink provides a generic counter interface with a few implementations. The 
default implementation SimpleCounter, which is not thread-safe, is used when 
you calling counter(String name) on a MetricGroup. Therefore, you'll need to 
use your own thread-safe implementation, check out the second example of 
Metrics | Apache 
Flink
 for reference.

Best,
Zhanghao Chen

From: Shane Bishop 
Sent: Saturday, March 5, 2022 5:24
To: user@flink.apache.org 
Subject: Question about Flink counters

Hi all,

For Flink counters [1], are increment operations guaranteed to be atomic across 
all parallel tasks? I.e., is there a guarantee that the counter values will not 
be higher than expected?

Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter


Re: Incremental checkpointing & RocksDB Serialization

2022-03-06 Thread Yun Tang
Hi Vidya,


  1.  You could tune your job to avoid backpressure. Maybe you can upgrade your 
flink engine to at least flink-1.13 to know how to monitor the back pressure 
status [1]
  2.  You can refer to [2] to know how to custom your serializer.


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/monitoring/back_pressure/
[2] 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/custom_serialization/

Best,
Yun Tang

From: Vidya Sagar Mula 
Sent: Sunday, March 6, 2022 4:16
To: Yun Tang 
Cc: user 
Subject: Re: Incremental checkpointing & RocksDB Serialization

Hi Yun Tang,
Thank you for the reply. I have follow up questions and need some more details. 
Can you please clarify my inline questions?

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

 - Yes, I expanded the checkpoint details and noticed e2e duration is 
much higher than async duration. Attaching the screenshot here(Checkpoint #59) 
Can you give elaborate more on "checkpoint barrier stay in the channel longer." 
What are the suggested ways to mitigate this issue? I am wondering how can this 
be avoided as it is happening only at the end of the window.


> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

 - What are your recommendations on other serializers? I tried to change 
it to Avro by enabling the flag "forceAvro" to TRUE in the Execution Config. 
But, it RocksDB is still going picking KryoSerializer. This is because the 
Transformation is KeyType is assigned as GenericType. I am not sure what 
changes need to made to my class/pojo to take the Avro Serialzer.
Can you please suggest the way to change to other better serializers?



On Fri, Mar 4, 2022 at 2:06 AM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi Vidya,

> Why is the incremental checkpointing taking more time for the snapshot at the 
> end of the window duration?

I guess that this is because the job is under back pressure on end of window. 
You can expand the checkpoint details to see whether that the async duration of 
each task is much slower than the e2e duration? If so, this caused the 
checkpoint barrier stay in the channel longer.

> Why is RocksDB serialization causing the CPU peak?

This is caused by the implementation of your serializer.

> Do you suggest any change in the serializer type in the RocksDB? (Kryo vs 
> Avro)

>From our experience,  kryo is not a good choice in most cases.

Best
Yun Tang

From: Vidya Sagar Mula mailto:mulasa...@gmail.com>>
Sent: Friday, March 4, 2022 17:00
To: user mailto:user@flink.apache.org>>
Subject: Incremental checkpointing & RocksDB Serialization

Hi,

I have a cluster that contains the Flink 1.11 version with AWS - S3 backend. I 
am trying the incremental checkpointing on this set up. I have a pipeline with 
a 10 mins window and incremental checkpointing happens every 2 mins.

Observation:
-
I am observing the long duration while taking the snapshot at the end of each 
window, which means every last checkpoint of the window (almost all the times).
I am attaching the Flink UI, checkpoint history.

My set up details:
---
Cluster: Cloud cluster with instance storage.
Memory : 20 GB,
Heap : 10 GB
Flink Managed Memory: 4.5 GB
Flink Version : 1.11
CPUs : 2

ROCKSDB_WRITE_BUFFER_SIZE: "2097152000"  ## 2GB

ROCKSDB_BLOCK_CACHE_SIZE: "104857600"## 100 Mb

ROCKSDB_BLOCK_SIZE: "5242880"  ## 5 Mb

ROCKSDB_CHECKPOINT_TRANSFER_THREAD_NUM: 4

ROCKSDB_MAX_BACKGROUND_THREADS: 4


In the analysis, I noticed that the CPU utilization is peaking to almost 100% 
at the time of issue. With further analysis with thread dumps at the time CPU 
peak, it is showing RocksDB serialization related call trace. All the thread 
samples are pointing to this stack.

Based on pipeline transformation class type, RocksDB is choosing Kryo 
Serializer. I did try to change the serializer type, but that is not the focal 
point I want to stress here.

I would like to understand the reason for high CPU utilization. I have tried to 
increase the CPU cycles to 2 and 4. But, it did not give me any better results. 
I have parallelism 2.

Please take a look at the below stack trace. Please suggest me why it is taking 
a lot of CPU at the time of serialize/deserialize in the RocksDB?



Stack-1, Stack-2, Stack-3 are attached to this email.

Questions:
---
- Why is the incremental checkpointing taking more time for the snapshot at the 
end of the window duration?
- Why is