Re: Backpressure handling in FileSource APIs - Flink 1.16

2023-05-24 Thread Kamal Mittal
Hello Shammon,

Can you please point out the classes where like for "FileSource" slow down
logic is placed?

Just wanted to understand it more better and try it at my end by running
various perf. runs, also apply changes in my application if any.

Rgds,
Kamal

On Wed, May 24, 2023 at 11:41 AM Kamal Mittal  wrote:

> Thanks Shammon for clarification.
>
> On Wed, May 24, 2023 at 11:01 AM Shammon FY  wrote:
>
>> Hi Kamal,
>>
>> The source will slow down when there is backpressure in the flink job,
>> you can refer to docs [1] and [2] to get more detailed information about
>> backpressure mechanism.
>>
>> Currently there's no API or Callback in source for users to do some
>> customized operations for backpressure, but users can collect the metrics
>> of the job and analysis, for example, the metrics in [1] and [3]. I hope
>> this can help you.
>>
>> [1]
>> https://flink.apache.org/2021/07/07/how-to-identify-the-source-of-backpressure/#:~:text=Backpressure%20is%20an%20indicator%20that,the%20queues%20before%20being%20processed
>> .
>> [2]
>> https://www.alibabacloud.com/blog/analysis-of-network-flow-control-and-back-pressure-flink-advanced-tutorials_596632
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/monitoring/back_pressure/
>>
>> On Tue, May 23, 2023 at 9:40 PM Kamal Mittal  wrote:
>>
>>> Hello Community,
>>>
>>> Can you please share views about the query asked above w.r.t back
>>> pressure for  FileSource APIs for Bulk and Record stream formats.
>>> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa
>>> conversion.
>>>
>>> Rgds,
>>> Kamal
>>>
>>> On Tue, 23 May 2023, 12:26 pm Kamal Mittal,  wrote:
>>>
 Added Flink community DL as well.

 -- Forwarded message -
 From: Kamal Mittal 
 Date: Tue, May 23, 2023 at 7:57 AM
 Subject: Re: Backpressure handling in FileSource APIs - Flink 1.16
 To: Shammon FY 


 Hello,

 Yes, want to take some custom actions and also if there is any default
 behavior of slowing down sending data in pipeline further or reading data
 from source somehow?

 Rgds,
 Kamal

 On Tue, May 23, 2023 at 6:06 AM Shammon FY  wrote:

> Hi Kamal,
>
> If I understand correctly, do you want the source to do some custom
> actions, such as current limiting, when there is backpressure in the job?
>
> Best,
> Shammon FY
>
>
> On Mon, May 22, 2023 at 2:12 PM Kamal Mittal 
> wrote:
>
>> Hello Community,
>>
>> Can you please share views about the query asked above w.r.t back
>> pressure for  FileSource APIs for Bulk and Record stream formats.
>> Planning to use these APIs w.r.t AVRO to Parquet and vice-versa
>> conversion.
>>
>> Rgds,
>> Kamal
>>
>> On Thu, May 18, 2023 at 2:33 PM Kamal Mittal 
>> wrote:
>>
>>> Hello Community,
>>>
>>> Does FileSource APIs for Bulk and Record stream formats handle back
>>> pressure by any way like slowing down sending data in piepline further 
>>> or
>>> reading data from source somehow?
>>> Or does it give any callback/handle so that any action can be taken?
>>> Can you please share details if any?
>>>
>>> Rgds,
>>> Kamal
>>>
>>


Web UI don't show up In Flink on Yarn (Flink 1.17)

2023-05-24 Thread tan yao
Hi all,
  I find a strange thing with flink 1.17 deployed on yarn (CDH 6.x), flink web 
ui can not show up from yarn web link "ApplicationMaster",even typed jobmanager 
ip directly in browser .
   when i run wordcount application in flink 1.17 examples, and click yarn web 
"ApplicationMaster" link , the flink web ui just can not show up.But this works 
in flink 1.16 on the same env.


Re: Why I can't run more than 19 tasks?

2023-05-24 Thread Shammon FY
Hi Hemi,

There may be two reasons that I can think of
1. The number of connections exceeds the MySQL limit, you can check the
options in my.cnf for your mysql server and increase the max connections.
2. Connection timeout for mysql client, you can try to add
'autoReconnect=true' to the connection url

Best,
Shammon FY


On Thu, May 25, 2023 at 8:32 AM Hemi Grs  wrote:

> hey everybody,
>
> I have a problem with my apache flink, I am synchronizing from MySQL to
> Elasticsearch but it seems that I can't run more than 19 tasks. it gave me
> this error:
>
> --
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.util.FlinkRuntimeException:
> java.sql.SQLTransientConnectionException: connection-pool-10.10.10.111:3306
> - Connection is not available, request timed out after 3ms. at
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:64)
> at
> com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.discoveryCaptureTables(MySqlSnapshotSplitAssigner.java:171)
> ... 12 more
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> java.sql.SQLTransientConnectionException: connection-pool-10.10.10.111:3306
> - Connection is not available, request timed out after 3ms. at
> com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:72)
> at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) at
> io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885)
> at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:418) at
> com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:61)
> ... 13 moreCaused by: java.sql.SQLTransientConnectionException:
> connection-pool-10.10.10.111:3306 - Connection is not available, request
> timed out after 3ms.
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
> at
> com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100)
> at
> com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:59)
> ... 17 more
> -
>
> I have try adding this 2 lines on flink-conf.yaml but doesn't do anything:
> -
> env.java.opts:
> "-Dcom.ververica.cdc.connectors.mysql.hikari.maximumPoolSize=100"
> flink.connector.mysql-cdc.max-pool-size: 100
> -
>
> does anybody know the solution?
> Additional info, my database is doing fine, because I try creating another
> apache flink server and it can run another 19 tasks, so total there 38
> tasks running and it's doing fine. So how do I run many tasks on 1 server
> and the server still have lots of resources.
>
> Thanks
>


Why I can't run more than 19 tasks?

2023-05-24 Thread Hemi Grs
hey everybody,

I have a problem with my apache flink, I am synchronizing from MySQL to
Elasticsearch but it seems that I can't run more than 19 tasks. it gave me
this error:

--
Caused by: org.apache.flink.util.FlinkRuntimeException:
org.apache.flink.util.FlinkRuntimeException:
java.sql.SQLTransientConnectionException: connection-pool-10.10.10.111:3306
- Connection is not available, request timed out after 3ms. at
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:64)
at
com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner.discoveryCaptureTables(MySqlSnapshotSplitAssigner.java:171)
... 12 more
Caused by: org.apache.flink.util.FlinkRuntimeException:
java.sql.SQLTransientConnectionException: connection-pool-10.10.10.111:3306
- Connection is not available, request timed out after 3ms. at
com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:72)
at io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:890) at
io.debezium.jdbc.JdbcConnection.connection(JdbcConnection.java:885)
at io.debezium.jdbc.JdbcConnection.connect(JdbcConnection.java:418) at
com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection(DebeziumUtils.java:61)
... 13 moreCaused by: java.sql.SQLTransientConnectionException:
connection-pool-10.10.10.111:3306 - Connection is not available, request
timed out after 3ms.
at
com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)
at
com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)
at
com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)
at
com.ververica.cdc.connectors.shaded.com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:100)
at
com.ververica.cdc.connectors.mysql.source.connection.JdbcConnectionFactory.connect(JdbcConnectionFactory.java:59)
... 17 more
-

I have try adding this 2 lines on flink-conf.yaml but doesn't do anything:
-
env.java.opts:
"-Dcom.ververica.cdc.connectors.mysql.hikari.maximumPoolSize=100"
flink.connector.mysql-cdc.max-pool-size: 100
-

does anybody know the solution?
Additional info, my database is doing fine, because I try creating another
apache flink server and it can run another 19 tasks, so total there 38
tasks running and it's doing fine. So how do I run many tasks on 1 server
and the server still have lots of resources.

Thanks


Re: Kafka Quotas & Consumer Group Client ID (Flink 1.15)

2023-05-24 Thread Mason Chen
Hi Hatem,

The reason for setting different client ids is to due to Kafka client
metrics conflicts and the issue is documented here:
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kafka/#kafka-consumer-metrics.
I think that the warning log is benign if you are using Flink's metric
system for monitoring the Kafka connector and it would be nice to introduce
an option in the connector to configure the same `client.id` across all
tasks for the quota feature you mentioned.

Best,
Mason

On Wed, May 24, 2023 at 5:18 AM Hatem Mostafa  wrote:

> Hello Martijn,
>
> Yes, checkpointing is enabled and the offsets are committed without a
> problem. I think I might have figured out the answer to my second question
> based on my understanding of this code
> ,
> flink uses low level consumer that does not trigger consumer.subscribe
> which makes the consumer group not appear as an active member using
> kafka-consumer-group tool, The consumer group functionality is fine though.
> However I am more interested in an answer for my first question. Kafka
> Quotas is one of the important features of using kafka and with flink
> setting a different client id for every consumer in the same consumer group
> makes it hard to set quotas for that consumer group. What is the reason
> behind setting different client ids?
>
> On Wed, May 24, 2023 at 1:13 PM Martijn Visser 
> wrote:
>
>> Hi Hatem,
>>
>> Could it be that you don't have checkpointing enabled? Flink only commits
>> its offset when a checkpoint has been completed successfully, as explained
>> on
>> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
>>
>> Best regards,
>>
>> Martijn
>>
>>
>> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa  wrote:
>>
>>> Hello,
>>>
>>> I have two questions that are related to each other:
>>>
>>> *First question:*
>>>
>>> I have been trying to set `client.id` to set a kafka client quota
>>>  for
>>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
>>> lot of data from our kafka cluster causing a denial of service for our
>>> kafka cluster. However `client.id` gets overridden by flink source here
>>> .
>>> How would I enforce quotas for flink kafka source?
>>>
>>> *Second question:*
>>>
>>> Also something I didn't quite understand when describing our consumer
>>> group in kafka why I don't see the metadata for the consumer group
>>> information (consumer id, client id & host) and I get that the consumer
>>> group has no active members but it's actually active and consuming.
>>>
>>> *Example describing a flink consumer group*
>>>
 ./kafka-consumer-groups.sh --bootstrap-server
 kafka-server-address:9092   --describe --group flink-consumer-group
 Consumer group 'flink-consumer-group' has no active members.
 GROUP   TOPIC   PARTITION
  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST
  CLIENT-ID
 flink_consumer_group topic_name 1 514588965   514689721
   100756   -
  - -
>>>
>>>
>>>
>>> *Example describing a normal consumer group written using a confluent
>>> kafka python library.*
>>>
 ./kafka-consumer-groups.sh ---bootstrap-server
 kafka-server-address:9092  --describe --group
 python_confluent_kafka_consumer
 GROUPTOPIC
  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID

   HOST   CLIENT-ID
 python_confluent_kafka_consumer topic_name   1
  17279532  17279908  376
 python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
 / python_confluent_kafka_consumer_client_id
>>>
>>>
>>>
>>> I am using flink version 1.15.
>>>
>>> Thanks,
>>> Hatem
>>>
>>>
>>>
>>>


Re: Kafka Quotas & Consumer Group Client ID (Flink 1.15)

2023-05-24 Thread Hatem Mostafa
Hello Martijn,

Yes, checkpointing is enabled and the offsets are committed without a
problem. I think I might have figured out the answer to my second question
based on my understanding of this code
,
flink uses low level consumer that does not trigger consumer.subscribe
which makes the consumer group not appear as an active member using
kafka-consumer-group tool, The consumer group functionality is fine though.
However I am more interested in an answer for my first question. Kafka
Quotas is one of the important features of using kafka and with flink
setting a different client id for every consumer in the same consumer group
makes it hard to set quotas for that consumer group. What is the reason
behind setting different client ids?

On Wed, May 24, 2023 at 1:13 PM Martijn Visser 
wrote:

> Hi Hatem,
>
> Could it be that you don't have checkpointing enabled? Flink only commits
> its offset when a checkpoint has been completed successfully, as explained
> on
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing
>
> Best regards,
>
> Martijn
>
>
> On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa  wrote:
>
>> Hello,
>>
>> I have two questions that are related to each other:
>>
>> *First question:*
>>
>> I have been trying to set `client.id` to set a kafka client quota
>>  for
>> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
>> lot of data from our kafka cluster causing a denial of service for our
>> kafka cluster. However `client.id` gets overridden by flink source here
>> .
>> How would I enforce quotas for flink kafka source?
>>
>> *Second question:*
>>
>> Also something I didn't quite understand when describing our consumer
>> group in kafka why I don't see the metadata for the consumer group
>> information (consumer id, client id & host) and I get that the consumer
>> group has no active members but it's actually active and consuming.
>>
>> *Example describing a flink consumer group*
>>
>>> ./kafka-consumer-groups.sh --bootstrap-server kafka-server-address:9092
>>>  --describe --group flink-consumer-group
>>> Consumer group 'flink-consumer-group' has no active members.
>>> GROUP   TOPIC   PARTITION
>>>  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID HOST
>>>  CLIENT-ID
>>> flink_consumer_group topic_name 1 514588965   514689721
>>>   100756   -
>>>  - -
>>
>>
>>
>> *Example describing a normal consumer group written using a confluent
>> kafka python library.*
>>
>>> ./kafka-consumer-groups.sh ---bootstrap-server
>>> kafka-server-address:9092  --describe --group
>>> python_confluent_kafka_consumer
>>> GROUPTOPIC
>>>  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID
>>>
>>>   HOST   CLIENT-ID
>>> python_confluent_kafka_consumer topic_name   1
>>>  17279532  17279908  376
>>> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
>>> / python_confluent_kafka_consumer_client_id
>>
>>
>>
>> I am using flink version 1.15.
>>
>> Thanks,
>> Hatem
>>
>>
>>
>>


First Flink Embedded Stateful Functions taking long to get invoked

2023-05-24 Thread Chinthakrindi, Rakesh
Hi team,


We are exploring flink stateful 
function 
for one of our use case. As part of feasibility test, we are doing load testing 
to determine the time spent by flink app orchestrating the functions (E2E 
request latency - time spent by each stateful function). We have deployed 
sample flink app on AWS KDA, with 10 embedded stateful functions connected 
sequentially each doing dummy work (10ms thread sleep).


We found some interesting insight where time spent from receiving the input to 
invoking of first embedded stateful function is very high compared to e2e time 
spent across invoking stateful functions.



With 100TPS load, e2e time spent in orchestrating functions is around P99 
2044ms and e2e orchestration time without time spent in invoking first function 
is P99 14ms, and there was no FLINK back pressure and CPU usage was less than 
100% overall during the experiment.



Can someone help with the following queries?

  *   Is it expected for flink app to take considerable time to invoke first 
stateful function after receiving an request from input stream (kinesis in our 
case)?
  *   Is there any way to fasten the invoking of first stateful function?
  *   Any pointer on how to control request to be processed by only one host 
irrespective of no. of stateful functions the requests have to processed?



Thanks
Rakesh


Re: Kafka Quotas & Consumer Group Client ID (Flink 1.15)

2023-05-24 Thread Martijn Visser
Hi Hatem,

Could it be that you don't have checkpointing enabled? Flink only commits
its offset when a checkpoint has been completed successfully, as explained
on
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#consumer-offset-committing

Best regards,

Martijn


On Tue, May 23, 2023 at 6:43 PM Hatem Mostafa  wrote:

> Hello,
>
> I have two questions that are related to each other:
>
> *First question:*
>
> I have been trying to set `client.id` to set a kafka client quota
>  for
> consumer_byte_rate since whenever our kafka job gets redeployed it reads a
> lot of data from our kafka cluster causing a denial of service for our
> kafka cluster. However `client.id` gets overridden by flink source here
> .
> How would I enforce quotas for flink kafka source?
>
> *Second question:*
>
> Also something I didn't quite understand when describing our consumer
> group in kafka why I don't see the metadata for the consumer group
> information (consumer id, client id & host) and I get that the consumer
> group has no active members but it's actually active and consuming.
>
> *Example describing a flink consumer group*
>
>> ./kafka-consumer-groups.sh --bootstrap-server kafka-server-address:9092
>>  --describe --group flink-consumer-group
>> Consumer group 'flink-consumer-group' has no active members.
>> GROUP   TOPIC   PARTITION  CURRENT-OFFSET
>>  LOG-END-OFFSET  LAG CONSUMER-ID HOSTCLIENT-ID
>> flink_consumer_group topic_name 1 514588965   514689721
>> 100756   -
>>- -
>
>
>
> *Example describing a normal consumer group written using a confluent
> kafka python library.*
>
>> ./kafka-consumer-groups.sh ---bootstrap-server kafka-server-address:9092
>> --describe --group python_confluent_kafka_consumer
>> GROUPTOPIC
>>  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG CONSUMER-ID
>>
>>   HOST   CLIENT-ID
>> python_confluent_kafka_consumer topic_name   1
>>  17279532  17279908  376
>> python_confluent_kafka_consumer-345fa1d1-1f76-4e38-9aad-dcc120c5a52e
>> / python_confluent_kafka_consumer_client_id
>
>
>
> I am using flink version 1.15.
>
> Thanks,
> Hatem
>
>
>
>


Re: Reading KafkaSource state from a savepoint using the State Processor API

2023-05-24 Thread Hang Ruan
Hi, Charles,

I am used to read the state in the debug mode. I always set the breakpoint
at the return statemnet in `SavepointReader#read`.
Then I could find the state I need in the field `SavepointMetadataV2
savepointMetadata`.
Finally I could deserialize the state bytes with
`KafkaPartitionSplitSerializer`.

Best,
Hang

Charles Tan  于2023年5月24日周三 06:27写道:

> Hi everyone,
>
> I have a few questions about reading KafkaSource state using the State
> Processor API. I have a simple Flink application which reads from a Kafka
> topic then produces to a different topic. After running the Flink job and
> stopping it with a savepoint, I then write a few more records to the input
> topic. When the job is resumed from this savepoint, it reads records from
> the position it left off, indicating that the job successfully used the
> savepoint to recover its position. When I inspect the savepoint file with
> the state processor API, I can read the "SourceReaderState" from the
> savepoint. However, the state is read as a Java byte array and I can't
> decode it or make any sense of it. I want to be able to read the savepoint
> state to find out exactly how much progress (partition/offset) a job has
> made in case it fails or is stopped.
>
> Does anyone have any ideas how I can deserialize the bytes from the Kafka
> source state or more generically how to read the Kafka source operator
> state from a savepoint?
>
> Here is the link to a github repository that contains the Flink job that I
> was running, a savepoint file, and the code I was using to try to read the
> savepoint. (https://github.com/charles-tan/flink-state-processor-example)
>
> Thanks,
> Charles
>