Re: Can we use CheckpointedFunction with the new Source api?

2022-05-30 Thread Qingsheng Ren
Hi Qing,

I’m afraid CheckpointedFunction cannot be applied to the new source API, but 
could you share the abstractions of your source implementation, like which 
component a split maps to etc.? Maybe we can try to do some workarounds. 

Best, 

Qingsheng

> On May 30, 2022, at 20:09, Qing Lim  wrote:
> 
> Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
> one in package org.apache.flink.api.connector.source)
>  
> My use case:
>  
> I have a custom source that emit individual nodes update from a tree, and I 
> wish to create a stream of the whole Tree snapshots, so I will have to 
> accumulate all updates and keep it as state. In addition to this, I wish to 
> expose this functionality as a library to my organization.
>  
> The custom source is written using the new Source api, I wonder if we can 
> attach state to it?
>  
> Kind regards
>  
> This e-mail and any attachments are confidential to the addressee(s) and may 
> contain information that is legally privileged and/or confidential. If you 
> are not the intended recipient of this e-mail you are hereby notified that 
> any dissemination, distribution, or copying of its content is strictly 
> prohibited. If you have received this message in error, please notify the 
> sender by return e-mail and destroy the message and all copies in your 
> possession.
> 
> 
> To find out more details about how we may collect, use and share your 
> personal information, please see https://www.mwam.com/privacy-policy. This 
> includes details of how calls you make to us may be recorded in order for us 
> to comply with our legal and regulatory obligations.
> 
> 
> To the extent that the contents of this email constitutes a financial 
> promotion, please note that it is issued only to and/or directed only at 
> persons who are professional clients or eligible counterparties as defined in 
> the FCA Rules. Any investment products or services described in this email 
> are available only to professional clients and eligible counterparties. 
> Persons who are not professional clients or eligible counterparties should 
> not rely or act on the contents of this email.
> 
> 
> Marshall Wace LLP is authorised and regulated by the Financial Conduct 
> Authority. Marshall Wace LLP is a limited liability partnership registered in 
> England and Wales with registered number OC302228 and registered office at 
> George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
> e-mail as a client, or an investor in an investment vehicle, managed or 
> advised by Marshall Wace North America L.P., the sender of this e-mail is 
> communicating with you in the sender's capacity as an associated or related 
> person of Marshall Wace North America L.P. ("MWNA"), which is registered with 
> the US Securities and Exchange Commission ("SEC") as an investment adviser.  
> Registration with the SEC does not imply that MWNA or its employees possess a 
> certain level of skill or training.
> 



Re: Status of File Sink Common (flink-file-sink-common)

2022-05-30 Thread yuxia
I'm afraid not. I can still find it in main repository[1].
[1] 
https://github.com/apache/flink/tree/master/flink-connectors/flink-file-sink-common

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jun Qin" 
收件人: "User" 
发送时间: 星期二, 2022年 5 月 31日 上午 5:24:10
主题: Status of File Sink Common (flink-file-sink-common) 

Hi,  

Has File Sink Common (flink-file-sink-common) been dropped? If so, since which 
version? I do not seem to find anything related in the release notes of 1.13.x, 
1.14.x and 1.15.0.

Thanks
Jun


Status of File Sink Common (flink-file-sink-common)

2022-05-30 Thread Jun Qin
Hi,  

Has File Sink Common (flink-file-sink-common) been dropped? If so, since which 
version? I do not seem to find anything related in the release notes of 1.13.x, 
1.14.x and 1.15.0.

Thanks
Jun

Re:Can we use CheckpointedFunction with the new Source api?

2022-05-30 Thread Xuyang
It seem that you are finding a custom checkpoint function with the new Source 
api. I'm not sure this [1][2] can help you. You can custom the checkpoint just 
like how KafkaSource do that.

[1] 
https://github.com/apache/flink/blob/9be49ff871feace87aed9d4e3f8132bcf0cd3945/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L208
[2] 
https://github.com/apache/flink/blob/9be49ff871feace87aed9d4e3f8132bcf0cd3945/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java#L107



At 2022-05-30 20:09:15, "Qing Lim"  wrote:

Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
one in package org.apache.flink.api.connector.source)

 

My use case:

 

I have a custom source that emit individual nodes update from a tree, and I 
wish to create a stream of the whole Tree snapshots, so I will have to 
accumulate all updates and keep it as state. In addition to this, I wish to 
expose this functionality as a library to my organization.

 

The custom source is written using the new Source api, I wonder if we can 
attach state to it?

 

Kind regards

 

This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.


To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.


To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.


Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. ("MWNA"), which is registered with the US 
Securities and Exchange Commission ("SEC") as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.

Can we use CheckpointedFunction with the new Source api?

2022-05-30 Thread Qing Lim
Hi, is it possible to use CheckpointedFunction with the new Source api? (The 
one in package org.apache.flink.api.connector.source)

My use case:

I have a custom source that emit individual nodes update from a tree, and I 
wish to create a stream of the whole Tree snapshots, so I will have to 
accumulate all updates and keep it as state. In addition to this, I wish to 
expose this functionality as a library to my organization.

The custom source is written using the new Source api, I wonder if we can 
attach state to it?

Kind regards

This e-mail and any attachments are confidential to the addressee(s) and may 
contain information that is legally privileged and/or confidential. If you are 
not the intended recipient of this e-mail you are hereby notified that any 
dissemination, distribution, or copying of its content is strictly prohibited. 
If you have received this message in error, please notify the sender by return 
e-mail and destroy the message and all copies in your possession.

To find out more details about how we may collect, use and share your personal 
information, please see https://www.mwam.com/privacy-policy. This includes 
details of how calls you make to us may be recorded in order for us to comply 
with our legal and regulatory obligations.

To the extent that the contents of this email constitutes a financial 
promotion, please note that it is issued only to and/or directed only at 
persons who are professional clients or eligible counterparties as defined in 
the FCA Rules. Any investment products or services described in this email are 
available only to professional clients and eligible counterparties. Persons who 
are not professional clients or eligible counterparties should not rely or act 
on the contents of this email.

Marshall Wace LLP is authorised and regulated by the Financial Conduct 
Authority. Marshall Wace LLP is a limited liability partnership registered in 
England and Wales with registered number OC302228 and registered office at 
George House, 131 Sloane Street, London, SW1X 9AT. If you are receiving this 
e-mail as a client, or an investor in an investment vehicle, managed or advised 
by Marshall Wace North America L.P., the sender of this e-mail is communicating 
with you in the sender's capacity as an associated or related person of 
Marshall Wace North America L.P. (“MWNA”), which is registered with the US 
Securities and Exchange Commission (“SEC”) as an investment adviser.  
Registration with the SEC does not imply that MWNA or its employees possess a 
certain level of skill or training.


Re: Large backpressure and slow checkpoints in StateFun

2022-05-30 Thread yuxia
May be you can use jstack or flame graph to analyze what's the bottleneck. 
BTW, about generating flame graph, arthas[1] is a good tool. 

[1] https://github.com/alibaba/arthas 

Best regards, 
Yuxia 


发件人: "Christopher Gustafson"  
收件人: "User"  
发送时间: 星期一, 2022年 5 月 30日 下午 2:29:19 
主题: Large backpressure and slow checkpoints in StateFun 



Hi, 




I am running some benchmarks using StateFun and have encountered a problem with 
backpressure and slow checkpoints that I can't figure out the reason for, and 
was hoping that someone might have an idea of what is causing it. My setup is 
the following: 



I am running the Shopping Cart application from the StateFun playground. The 
job is submitted as an uber jar to an existing Flink Cluster with 3 
TaskManagers and 1 JobManager. The functions are served using the Undertow 
example from the documentation and I am using Kafka ingresses and egresses. My 
workload is only at 1000 events/s. Everything is run in separate GCP VMs. 




The issue is with very long checkpoints, which I assume is caused by a 
backpressured ingress caused by the function dispatcher operator not being able 
to handle the workload. The only thing that has helped so far is to increase 
the parallelism of the job, but it feels like the still is some other 
bottleneck that is causing the issues. I have seen other benchmarks reaching 
much higher throughput than 1000 events/s, without more CPU or memory resources 
than I am using. 




Any ideas of bottlenecks or ways to figure them out are greatly appreciated. 




Best Regards, 

Christopher Gustafson 



Re: Flink 1.14.4 -> 1.15.0 Upgrade Problem

2022-05-30 Thread Yun Gao
Hi Clayton, 

Could you also help provide the topology of the job?

Also, if convenient could you also have a look at 
the back-pressure status of each node, we could
then locate which node are getting slowly and might
cause the lag.

Best,
Yun


--
From:Clayton Wohl 
Send Time:2022 May 26 (Thu.) 00:31
To:user 
Subject:Flink 1.14.4 -> 1.15.0 Upgrade Problem

I have a Flink job that has been running with Flink 1.14.4 perfectly for a few 
months.

I tried upgrading to Flink 1.15.0. There are no error messages or exceptions, 
it runs perfectly fine for several hours, but after a few hours the Flink app 
starts to lag in processing an input Kafka topic. I can see the lag grow 
linearly in my Grafana dashboards that track Kafka lag. The lag continues to 
grow indefinitely until I manually restart the Flink job, then the Flink job 
will catch up with old data, the lag will drop to zero, the application will 
run fine for several hours, and then the lag issue will happen again and lag 
will steadily grow until I manually restart the Flink job.

When I revert the application back to Flink 1.14.4, this lag issue completely 
goes away. I see no runtime errors or exceptions.

A few quick environment details:
- The Kafka brokers are running Kafka 2.8.1
- The Flink app is running on Kubernetes with the Spotify Flink Operator
- The Flink code is Java using the newer KafkaSource/KafkaSink API, not the 
older KafkaConsumer/KafkaProduer API.

The Flink app consumes from seven input Kafka topics, and for each distinct 
input topic, writes output values to a distinct output topic. Most of the 
processing happens within a RichAsyncFunction which does some processing 
against an external database. The lag issue mentioned here happens on different 
topics. And if I let the app run long enough, it will happen on multiple 
topics. Also, when the lag issue is happening, the app is still processing 
records on the affected topics. For some reason it's processing fewer record 
slower than the incoming message rate, which is the definition of lag. But 
clearly, the lag isn't caused by resources, but by a software bug within Flink.

I intend to keep this job running Flink 1.14.4 until a Flink 1.15.1 patch comes 
out that supposedly addresses this issue. This job is not using or requiring 
any new Flink 1.15.0 functionality. However, we prefer to use the newest 
versions when we can. Switching Flink versions is just changing Maven 
dependencies, changing the base Flink Docker image version, and the Flink 
version tag specified to the Kubernetes Spotify Operator.

I was hoping this report would help the flink developers with a heads up that 
there is a new bug introduced in 1.15.0. If there is anything I should try, let 
me know. Thanks :)


Large backpressure and slow checkpoints in StateFun

2022-05-30 Thread Christopher Gustafson
Hi,


I am running some benchmarks using StateFun and have encountered a problem with 
backpressure and slow checkpoints that I can't figure out the reason for, and 
was hoping that someone might have an idea of what is causing it. My setup is 
the following:


I am running the Shopping Cart application from the StateFun playground. The 
job is submitted as an uber jar to an existing Flink Cluster with 3 
TaskManagers and 1 JobManager. The functions are served using the Undertow 
example from the documentation and I am using Kafka ingresses and egresses. My 
workload is only at 1000 events/s. Everything is run in separate GCP VMs.


The issue is with very long checkpoints, which I assume is caused by a 
backpressured ingress caused by the function dispatcher operator not being able 
to handle the workload. The only thing that has helped so far is to increase 
the parallelism of the job, but it feels like the still is some other 
bottleneck that is causing the issues. I have seen other benchmarks reaching 
much higher throughput than 1000 events/s, without more CPU or memory resources 
than I am using.


Any ideas of bottlenecks or ways to figure them out are greatly appreciated.


Best Regards,

Christopher Gustafson