Re: Java 17 incompatibilities with Flink 1.18.1 version

2024-05-30 Thread Zhanghao Chen
Hi Rajat,

There's no need to get Flink libraries' compiled version on jdk17. The only 
things you need to do are:


  1.
Configure the JAVA_HOME env to use JDK 17 on both the client and server side
  2.
Configure the Flink JVM options properly to cooperate with the JDK 
modularization. The default configuration in the Flink distribution has already 
been configured such that Flink itself works on Java 17.

Best,
Zhanghao Chen

From: Rajat Pratap 
Sent: Thursday, May 30, 2024 13:17
To: Zhanghao Chen 
Subject: Re: Java 17 incompatibilities with Flink 1.18.1 version

Hi Zhanghao,

Thank you for your email. I have a few questions regarding migrating our flink 
jobs from jdk 8 to 17.
How to get flink libraries' compiled version on jdk17, do we have to do it or 
are there any maven artifacts with jdk 17 compiled versions? Our flink jobs are 
on jdk 8 on client side, how to migrate to jdk17 with Flink provided jars?

Your assistance is much appreciated.

Regards,
Rajat

On Wed, May 29, 2024, 6:16 AM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Rajat,

Flink releases are compiled with JDK 8 but it is able to run on JDK 8-17. As 
long as your Flink runs on JDK17 on both server and client side, you are free 
to write your Flink jobs with Java 17.

Best,
Zhanghao Chen

From: Rajat Pratap mailto:rajat789pra...@gmail.com>>
Sent: Tuesday, May 28, 2024 15:28
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: Java 17 incompatibilities with Flink 1.18.1 version

Hi Team,

I am writing flink jobs with latest release version for flink (1.18.1). The 
jobmanager is also deployed with the same version build. However, we faced 
issues when we deployed the jobs.

On further investigation, I noticed all libraries from flink have build jdk 
1.8. I have the following questions:
 - Is this the correct version for writing java 17 flink jobs?
 - How to write flink jobs with Java 17 code and which library versions to use?
 - Is Java 17 even supported on the client side or is it just on the server 
side support for java 17?

Your urgent help is much appreciated.

Regards,
Rajat



Re: Re:Flink SQL消费kafka topic有办法限速么?

2024-05-28 Thread Zhanghao Chen
应该是可以的。另外在老版本的 Kafka connector 上,曾经也实现过限速逻辑 [1],可以参考下。这个需求我觉得还比较通用,可以提一个 JIRA。

[1] https://issues.apache.org/jira/browse/FLINK-11501

Best,
Zhanghao Chen

From: casel.chen 
Sent: Tuesday, May 28, 2024 22:00
To: user-zh@flink.apache.org 
Subject: Re:Flink SQL消费kafka topic有办法限速么?

查了下Flink源码,当前DataGeneratorSource有添加RateLimiterStrategy参数,但KafkaSource没有该参数,可以像DataGeneratorSource那样来实现限速么?

public DataGeneratorSource(

GeneratorFunction generatorFunction,

long count,

RateLimiterStrategy rateLimiterStrategy,

TypeInformation typeInfo) {...}

















在 2024-05-27 23:47:40,"casel.chen"  写道:
>Flink SQL消费kafka topic有办法限速么?场景是消费kafka 
>topic数据写入下游mongodb,在业务高峰期时下游mongodb写入压力大,希望能够限速消费kafka,请问要如何实现?


Re: Java 17 incompatibilities with Flink 1.18.1 version

2024-05-28 Thread Zhanghao Chen
Hi Rajat,

Flink releases are compiled with JDK 8 but it is able to run on JDK 8-17. As 
long as your Flink runs on JDK17 on both server and client side, you are free 
to write your Flink jobs with Java 17.

Best,
Zhanghao Chen

From: Rajat Pratap 
Sent: Tuesday, May 28, 2024 15:28
To: user@flink.apache.org 
Subject: Java 17 incompatibilities with Flink 1.18.1 version

Hi Team,

I am writing flink jobs with latest release version for flink (1.18.1). The 
jobmanager is also deployed with the same version build. However, we faced 
issues when we deployed the jobs.

On further investigation, I noticed all libraries from flink have build jdk 
1.8. I have the following questions:
 - Is this the correct version for writing java 17 flink jobs?
 - How to write flink jobs with Java 17 code and which library versions to use?
 - Is Java 17 even supported on the client side or is it just on the server 
side support for java 17?

Your urgent help is much appreciated.

Regards,
Rajat



Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

2024-05-23 Thread Zhanghao Chen
Hi John,

Based on the Memory config screenshot provided before, each of your TM should 
have MaxDirectMemory=1GB (network mem) + 128 MB (framework off-heap) = 1152 MB. 
Nor will taskmanager.memory.flink.size and the total including MaxDirectMemory 
exceed pod physical mem, you may check the detailed TM memory model [1] and 
double check for yourself.

Maybe you can further analyze the direct memory usage using tools like JVM 
Native Memory Tracking (NMT).

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/memory/mem_setup_tm/#detailed-memory-model

Best,
Zhanghao Chen

From: John Smith 
Sent: Thursday, May 23, 2024 22:40
To: Zhanghao Chen 
Cc: Biao Geng ; user 
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Based on these two settings...
taskmanager.memory.flink.size: 16384m
taskmanager.memory.jvm-metaspace.size: 3072m

Reading the docs here I'm not sure how to calculate the formula. My suspicion 
is that I may have allocated too much of taskmanager.memory.flink.size and the 
total including MaxDirectMemory is more than what the physical OS has, is that 
possible?
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#taskmanager-memory-task-heap-size

Are you able to tell me what these numbers come out for this formula 
MaxDirectMemory of TM = Network Memory + Task Off-Heap + Framework Off-heap?

On Thu, May 23, 2024 at 9:01 AM John Smith 
mailto:java.dev@gmail.com>> wrote:
Ok, but I still don't get why it's doing it... It's the same version of 
flink... Only difference is java 11 and also I allocated more JVM heap and the 
actual physical is has more ram. Maybe I should reduce the JVM heap by a a 
gigabyte or two?

On Wed, May 22, 2024, 12:37 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi John,

A side note here: Flink will set the MaxDirectMemory of TM = Network Memory + 
Task Off-Heap + Framework Off-heap, and overwrites JVM's default setting, 
regardless of the version of JVM.

Best,
Zhanghao Chen

From: John Smith mailto:java.dev@gmail.com>>
Sent: Wednesday, May 22, 2024 22:56
To: Biao Geng mailto:biaoge...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: Would Java 11 cause Getting OutOfMemoryError: Direct buffer memory?

Hi, apologies I hit reply instead of reply all. So not sure who saw this or 
didn't. We have not switched to SSL and also our assumption here would be that 
if we did switch to SSL the jobs would not work or produce invalid results. The 
jobs work absolutely fine for a week or so and then they fail.

Here is the consumer config from the task logs, which says PLAINTEXT and port 
9092 is used. Also I attached a screen of the task manager memory usage. As 
well I read up on MaxDirectMemory setting of Java 8 vs Java 11. Java 8 by 
default calculates the direct memory size to 87% of the max heap size. While 
Java 11 set it to 100% of the max heap size.

[Screen Shot 2024-05-22 at 9.50.38 AM.png]

 allow.auto.create.topics = true
auto.commit.interval.ms<http://auto.commit.interval.ms> = 5000
auto.offset.reset = latest
bootstrap.servers = [xx-kafka-0001:9092, xx-0002:9092, 
xx-kafka-0003:9092]
check.crcs = true
client.dns.lookup = default
client.id<http://client.id> = xx
client.rack =
connections.max.idle.ms<http://connections.max.idle.ms> = 54
default.api.timeout.ms<http://default.api.timeout.ms> = 6
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms<http://fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id<http://group.id> = xx
group.instance.id<http://group.instance.id> = null
heartbeat.interval.ms<http://heartbeat.interval.ms> = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms<http://max.poll.interval.ms> = 30
max.poll.records = 500
metadata.max.age.ms<http://metadata.max.age.ms> = 30
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms<http://metrics.sample.window.ms> = 3
partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms<http://reconnect.backoff.max.ms> = 1000
reconnect.backoff.ms<http://reconnect.backoff.ms> = 50
request.timeout.ms<http://request.timeout.ms> = 6
retry.backoff.ms<http://retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 6
sasl.kerberos.service.name<http://sasl.kerberos.service.name> = null
sa

Re: Task Manager memory usage

2024-05-23 Thread Zhanghao Chen
Hi Sigalit,

For states stored in memory, they would most probably keep alive for several 
rounds of GC and ended up in the old gen of heap, and won't get recycled until 
a Full GC.

As for the TM pod memory usage, most probabliy it will stop increasing at some 
point. You could try setting a larger taskmanager.memory.jvm-overhead memory, 
and monitor it for a long time. If that's not the case, then there might be 
native memory leakage somewhere, but that may not be related to the state.

Best,
Zhanghao Chen

From: Sigalit Eliazov 
Sent: Thursday, May 23, 2024 18:20
To: user 
Subject: Task Manager memory usage


Hi,

I am trying to understand the following behavior in our Flink application 
cluster. Any assistance would be appreciated.

We are running a Flink application cluster with 5 task managers, each with the 
following configuration:

  *   jobManagerMemory: 12g
  *   taskManagerMemory: 20g
  *   taskManagerMemoryHeapSize: 12g
  *   taskManagerMemoryNetworkMax: 4g
  *   taskManagerMemoryNetworkMin: 1g
  *   taskManagerMemoryManagedSize: 50m
  *   taskManagerMemoryOffHeapSize: 2g
  *   taskManagerMemoryNetworkFraction: 0.2
  *   taskManagerNetworkMemorySegmentSize: 4mb
  *   taskManagerMemoryFloatingBuffersPerGate: 64
  *   taskmanager.memory.jvm-overhead.min: 256mb
  *   taskmanager.memory.jvm-overhead.max: 2g
  *   taskmanager.memory.jvm-overhead.fraction: 0.1

Our pipeline includes stateful transformations, and we are verifying that we 
clear the state once it is no longer needed.

Through the Flink UI, we observe that the heap size increases and decreases 
during the job lifecycle.

However, there is a noticeable delay between clearing the state and the 
reduction in heap size usage, which I assume is related to the garbage 
collector frequency.

What is puzzling is the task manager pod memory usage. It appears that the 
memory usage increases intermittently and is not released. We verified the 
different state metrics and confirmed they are changing according to the logic.

Additionally, if we had a state that was never released, I would expect to see 
the heap size increasing constantly as well.

Any insights or ideas?

Thanks,

Sigalit


Re: Parallelism for auto-scaling, memory for auto-tuning - Flink operator

2024-04-17 Thread Zhanghao Chen
If you have some experience before, I'd recommend setting a good parallelism 
and TM resource spec first, to give the autotuner a good starting point. 
Usually, the autoscaler can tune your jobs well within a few attempts (<=3). As 
for  `pekko.ask.timeout`, the default value should be sufficient in most cases.

Best,
Zhanghao Chen

From: Maxim Senin via user 
Sent: Thursday, April 18, 2024 5:56
To: user@flink.apache.org 
Subject: Parallelism for auto-scaling, memory for auto-tuning - Flink operator


Hi.

Does it make sense to specify `parallelism` for task managers or the `job`, 
and, similarly, to specify memory amount for the task managers, or it’s better 
to leave it to autoscaler and autotuner to pick the best values? How many times 
would the autoscaler need to restart task managers before it picks the right 
values? Does `pekko.ask.timeout` need to be sufficient for task managers to get 
into running state with all the restarts?



Cheers,

Maxim



COGILITY SOFTWARE CORPORATION LEGAL DISCLAIMER: The information in this email 
is confidential and is intended solely for the addressee. Access to this email 
by anyone else is unauthorized. If you are not the intended recipient, any 
disclosure, copying, distribution or any action taken or omitted to be taken in 
reliance on it, is prohibited and may be unlawful.


Re: Flink job performance

2024-04-15 Thread Zhanghao Chen
The exception basically says the remote TM is unreachable, probably terminated 
due to some other reasons. This may not be the root cause. Is there any other 
exceptions in the log? Also, since the overall resource usage is almost full, 
could you try allocating more CPUs and see if the instability persists?

Best,
Zhanghao Chen

From: Oscar Perez 
Sent: Monday, April 15, 2024 19:24
To: Zhanghao Chen 
Cc: Oscar Perez via user 
Subject: Re: Flink job performance

Hei, ok that is weird. Let me resend them.

Regards,
Oscar

On Mon, 15 Apr 2024 at 14:00, Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi, there seems to be sth wrong with the two images attached in the latest 
email. I cannot open them.

Best,
Zhanghao Chen

From: Oscar Perez via user mailto:user@flink.apache.org>>
Sent: Monday, April 15, 2024 15:57
To: Oscar Perez via user mailto:user@flink.apache.org>>; 
pi-team mailto:pi-t...@n26.com>>; Hermes Team 
mailto:hermes-t...@n26.com>>
Subject: Flink job performance

Hi community!

We have an interesting problem with Flink after increasing parallelism in a 
certain way. Here is the summary:

1)  We identified that our job bottleneck were some Co-keyed process operators 
that were affecting on previous operators causing backpressure.
2( What we did was to increase the parallelism to all the operators from 6 to 
12 but keeping 6 these operators that read from kafka. The main reason was that 
all our topics have 6 partitions so increasing the parallelism will not yield 
better performance

See attached job layout prior and after the changes:
What happens was that some operations that were chained in the same operator 
like reading - filter - map - filter now are rebalanced and the overall 
performance of the job is suffering (keeps throwing exceptions now and then)

Is the rebalance operation going over the network or this happens in the same 
node? How can we effectively improve performance of this job with the given 
resources?

Thanks for the input!
Regards,
Oscar




Re: Flink job performance

2024-04-15 Thread Zhanghao Chen
Hi, there seems to be sth wrong with the two images attached in the latest 
email. I cannot open them.

Best,
Zhanghao Chen

From: Oscar Perez via user 
Sent: Monday, April 15, 2024 15:57
To: Oscar Perez via user ; pi-team ; 
Hermes Team 
Subject: Flink job performance

Hi community!

We have an interesting problem with Flink after increasing parallelism in a 
certain way. Here is the summary:

1)  We identified that our job bottleneck were some Co-keyed process operators 
that were affecting on previous operators causing backpressure.
2( What we did was to increase the parallelism to all the operators from 6 to 
12 but keeping 6 these operators that read from kafka. The main reason was that 
all our topics have 6 partitions so increasing the parallelism will not yield 
better performance

See attached job layout prior and after the changes:
What happens was that some operations that were chained in the same operator 
like reading - filter - map - filter now are rebalanced and the overall 
performance of the job is suffering (keeps throwing exceptions now and then)

Is the rebalance operation going over the network or this happens in the same 
node? How can we effectively improve performance of this job with the given 
resources?

Thanks for the input!
Regards,
Oscar




Re: Pyflink Performance and Benchmark

2024-04-15 Thread Zhanghao Chen
When it comes down to the actual runtime, what really matters is the plan 
optimization and the operator impl & shuffling. You might be interested in this 
blog: 
https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/, 
which did a benchmark on the latter with the common the JSON processing 
scenario with UDFs in Java/Python under thread mode/Python under process mode.


Best,
Zhanghao Chen


From: Niklas Wilcke
Sent: Monday, April 15, 2024 15:17
To: user
Subject: Pyflink Performance and Benchmark

Hi Flink Community,

I wanted to reach out to you to get some input about Pyflink performance. Are 
there any resources available about Pyflink benchmarks and maybe a comparison 
with the Java API? I wasn't able to find something valuable, but maybe I missed 
something?
I am aware that benchmarking in this case is really dependent and that a 
general statement is difficult. I'm rather looking for numbers to get a first 
impression or maybe a framework to do some benchmarking on my own. Any help is 
highly appreciated. Thank you!

Kind regards,
Niklas



Re: Flink job performance

2024-04-15 Thread Zhanghao Chen
Hi Oscar,

The rebalance operation will go over the network stack, but not necessarily 
involving remote data shuffle. For data shuffling between tasks of the same 
node, the local channel is used, but compared to chained operators, it still 
introduces extra data serialization overhead. For data shuffling between tasks 
on different nodes, remote network shuffling is involved.

Therefore, breaking the chain with extra rebalance operation will definitely 
add extra overhead. But usually, it is negligible under a small parallelism 
setting like yours. Could you share the exception details thrown after the 
change?

From: Oscar Perez via user 
Sent: Monday, April 15, 2024 15:57
To: Oscar Perez via user ; pi-team ; 
Hermes Team 
Subject: Flink job performance

Hi community!

We have an interesting problem with Flink after increasing parallelism in a 
certain way. Here is the summary:

1)  We identified that our job bottleneck were some Co-keyed process operators 
that were affecting on previous operators causing backpressure.
2( What we did was to increase the parallelism to all the operators from 6 to 
12 but keeping 6 these operators that read from kafka. The main reason was that 
all our topics have 6 partitions so increasing the parallelism will not yield 
better performance

See attached job layout prior and after the changes:
What happens was that some operations that were chained in the same operator 
like reading - filter - map - filter now are rebalanced and the overall 
performance of the job is suffering (keeps throwing exceptions now and then)

Is the rebalance operation going over the network or this happens in the same 
node? How can we effectively improve performance of this job with the given 
resources?

Thanks for the input!
Regards,
Oscar




Re: How to enable RocksDB native metrics?

2024-04-11 Thread Zhanghao Chen
Add a space between -yD and the param should do the trick.

Best,
Zhanghao Chen

From: Lei Wang 
Sent: Thursday, April 11, 2024 19:40
To: Zhanghao Chen 
Cc: Biao Geng ; user 
Subject: Re: How to enable RocksDB native metrics?

Hi Zhanghao,

flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu 
default -p 8  -yDstate.backend.latency-track.keyed-state-enabled=true -c 
com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic 
dwd_audio_record  --groupId clean_wl_ --sourceServers  x.x.x.x:9092

Tried, it doesn't work, the error is:
Could not get job jar and dependencies from JAR file: JAR file does not exist: 
-yDstate.backend.latency-track.keyed-state-enabled=true

Thanks,
Lei

On Thu, Apr 11, 2024 at 5:19 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Hi Lei,

You are using an old-styled CLI for YARN jobs where "-yD" instead of "-D" 
should be used.

From: Lei Wang mailto:leiwang...@gmail.com>>
Sent: Thursday, April 11, 2024 12:39
To: Biao Geng mailto:biaoge...@gmail.com>>
Cc: user mailto:user@flink.apache.org>>
Subject: Re: How to enable RocksDB native metrics?

Hi Biao,

I  tried, it  doesn't work. The cmd is:

flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu 
default -p 8  -Dstate.backend.latency-track.keyed-state-enabled=true -c 
com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic 
dwd_audio_record  --groupId clean_wl_ --sourceServers  x.x.x.x:9092

Seems the -D param is ignored. Even i change the param to a wrong spelling, the 
job is submitted successfully.
Any suggestions on this?

Thanks,
Lei

On Mon, Apr 8, 2024 at 9:48 AM Biao Geng 
mailto:biaoge...@gmail.com>> wrote:
Hi Lei,
You can use the "-D" option in the command line to set configs for a specific 
job. E.g, `flink run-application -t yarn-application  
-Djobmanager.memory.process.size=1024m  `.
See 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/ 
for more details.

Best,
Biao Geng

Marco Villalobos mailto:mvillalo...@kineteque.com>> 
于2024年4月8日周一 09:22写道:
Hi Lei,

Have you tried enabling these Flink configuration properties?

<https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
Configuration<https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
nightlies.apache.org<https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>
[favicon.png]<https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics>

Sent from my iPhone

On Apr 7, 2024, at 6:03 PM, Lei Wang 
mailto:leiwang...@gmail.com>> wrote:


I  want to enable it only for specified jobs, how can I specify the   
configurations on  cmd line when submitting a job?

Thanks,
Lei

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lei,

You can enable it by some configurations listed in: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
   (RocksDB Native Metrics)


Best,
Zakelly

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lei,

You can enable it by some configurations listed in: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
   (RocksDB Native Metrics)


Best,
Zakelly

On Sun, Apr 7, 2024 at 4:47 PM Lei Wang 
mailto:leiwang...@gmail.com>> wrote:

Using big state and want to do some performance tuning, how can i enable 
RocksDB native metrics?

I  am using  Flink 1.14.4

Thanks,
Lei


Re: How to enable RocksDB native metrics?

2024-04-11 Thread Zhanghao Chen
Hi Lei,

You are using an old-styled CLI for YARN jobs where "-yD" instead of "-D" 
should be used.

From: Lei Wang 
Sent: Thursday, April 11, 2024 12:39
To: Biao Geng 
Cc: user 
Subject: Re: How to enable RocksDB native metrics?

Hi Biao,

I  tried, it  doesn't work. The cmd is:

flink run -m yarn-cluster -ys 4 -ynm EventCleaning_wl -yjm 2G -ytm 16G -yqu 
default -p 8  -Dstate.backend.latency-track.keyed-state-enabled=true -c 
com.zkj.task.EventCleaningTask SourceDataCleaning-wl_0410.jar --sourceTopic 
dwd_audio_record  --groupId clean_wl_ --sourceServers  x.x.x.x:9092

Seems the -D param is ignored. Even i change the param to a wrong spelling, the 
job is submitted successfully.
Any suggestions on this?

Thanks,
Lei

On Mon, Apr 8, 2024 at 9:48 AM Biao Geng 
mailto:biaoge...@gmail.com>> wrote:
Hi Lei,
You can use the "-D" option in the command line to set configs for a specific 
job. E.g, `flink run-application -t yarn-application  
-Djobmanager.memory.process.size=1024m  `.
See 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/ 
for more details.

Best,
Biao Geng

Marco Villalobos mailto:mvillalo...@kineteque.com>> 
于2024年4月8日周一 09:22写道:
Hi Lei,

Have you tried enabling these Flink configuration properties?


Configuration
nightlies.apache.org
[favicon.png]

Sent from my iPhone

On Apr 7, 2024, at 6:03 PM, Lei Wang 
mailto:leiwang...@gmail.com>> wrote:


I  want to enable it only for specified jobs, how can I specify the   
configurations on  cmd line when submitting a job?

Thanks,
Lei

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lei,

You can enable it by some configurations listed in: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
   (RocksDB Native Metrics)


Best,
Zakelly

On Sun, Apr 7, 2024 at 4:59 PM Zakelly Lan 
mailto:zakelly@gmail.com>> wrote:
Hi Lei,

You can enable it by some configurations listed in: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics
   (RocksDB Native Metrics)


Best,
Zakelly

On Sun, Apr 7, 2024 at 4:47 PM Lei Wang 
mailto:leiwang...@gmail.com>> wrote:

Using big state and want to do some performance tuning, how can i enable 
RocksDB native metrics?

I  am using  Flink 1.14.4

Thanks,
Lei


Re: Debugging Kryo Fallback

2024-04-09 Thread Zhanghao Chen
Hi, you may first enable the Kryo fallback option first, submit the job, and 
search for "be processed as GenericType". Flink will print it in most cases 
where we fall back to Kryo (a few exceptions including type Class, Object, 
recursive types, interface).

Best,
Zhanghao Chen

From: Salva Alcántara 
Sent: Monday, April 8, 2024 16:01
To: Yunfeng Zhou 
Cc: user@flink.apache.org 
Subject: Re: Debugging Kryo Fallback

Yeah I think you're right and there is no need for anything, really.

I was thinking of having more user friendly tests for my POJOs for which I 
checked the Kryo Fallback and if detected provide an exhaustive list of issues 
found (vs raising an exception for the first problem, requiring users to fix 
it, then see if there are more issues/exceptions, fix them and so on).

Salva

On Mon, Apr 8, 2024 at 8:46 AM Yunfeng Zhou 
mailto:flink.zhouyunf...@gmail.com>> wrote:
Hi Salva,

Could you please give me some hint about the issues Flink can collect
apart from the exception and the existing logs? Suppose we record the
exception in the log and the Flink job continues, I can imagine that
similar Kryo exceptions from each of the rest records will then appear
in the log as well. They expose the same bug to fix so there is no
need to collect all of them. Would there be other issues to collect
apart from these repeating similar exceptions?

Best regards,
Yunfeng

On Sun, Apr 7, 2024 at 5:15 PM Salva Alcántara 
mailto:salcantara...@gmail.com>> wrote:
>
> Thanks Yunfeng! That is more or less what I do now when I run into the 
> problem. This approach reports problems one at a time (an exception is raised 
> on the first problem encountered).
>
> Instead of that, I think accumulating all the issues and presenting them all 
> at once would be more user friendly.
>
> Regards,
>
> Salva
>
> On Sun, Apr 7, 2024 at 5:43 AM Yunfeng Zhou 
> mailto:flink.zhouyunf...@gmail.com>> wrote:
>>
>> Hi Salva,
>>
>> According to the description of the configuration
>> `pipeline.generic-types`, after setting this to false you should be
>> able to find UnsupportedOperationException in the Flink log describing
>> the data types that have not been supported in Kryo. Then you should
>> be able to look into your code finding out the usages of the certain
>> data type and perform corresponding bug fixes. Other information
>> provided in Flink's log, like those info-level logs in
>> TypeExtractor@analyzePojo, might help reveal more details around the
>> exception.
>>
>> Best,
>> Yunfeng
>>
>> On Wed, Apr 3, 2024 at 4:19 PM Salva Alcántara 
>> mailto:salcantara...@gmail.com>> wrote:
>> >
>> > FYI Reposted in SO:
>> > - 
>> > https://stackoverflow.com/questions/78265380/how-to-debug-the-kryo-fallback-in-flink
>> >
>> > On Thu, Mar 28, 2024 at 7:24 AM Salva Alcántara 
>> > mailto:salcantara...@gmail.com>> wrote:
>> >>
>> >> I wonder which is the simplest way of troubleshooting/debugging what 
>> >> causes the Kryo fallback.
>> >>
>> >> Detecting it is just a matter of adding this line to your job:
>> >>
>> >> ```
>> >> env.getConfig().disableGenericTypes();
>> >> ```
>> >>
>> >> or in more recent versions:
>> >>
>> >> ```
>> >> pipeline.generic-types: false
>> >>
>> >> ```
>> >>
>> >> But once you detect the issue, what is the simplest way to debug it? You 
>> >> can of course add a breakpoint in:
>> >> org.apache.flink.api.java.typeutils.TypeExtractor@analyzePojo
>> >>
>> >> but ideally there should be a simpler way to show all the problems 
>> >> encountered to the user without having to get that deep into the code.
>> >>
>> >> Thanks in advance,
>> >>
>> >> Salva


Re: 回复:退订

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 戴少 
Sent: Monday, April 1, 2024 11:10
To: user-zh 
Cc: user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com 
;
 user-zh-subscribe ; user-zh 

Subject: 回复:退订

退订

--

Best Regards,




 回复的原邮件 
| 发件人 | 李一飞 |
| 发送日期 | 2024年03月14日 00:09 |
| 收件人 | 
user-zh-sc.1618840368.ibekedaekejgeemingfn-kurisu_li=163.com,
user-zh-subscribe ,
user-zh  |
| 主题 | 退订 |
退订




Re: 退订

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: zjw 
Sent: Monday, April 1, 2024 11:05
To: user-zh@flink.apache.org 
Subject: 退订




Re: Re:Re: Re: 1.19自定义数据源

2024-03-31 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 地址来取消订阅。你可以参考[1] 来管理你的邮件订阅。

[1]
https://flink.apache.org/zh/community/#%e9%82%ae%e4%bb%b6%e5%88%97%e8%a1%a8

Best,
Zhanghao Chen

From: 熊柱 <18428358...@163.com>
Sent: Monday, April 1, 2024 11:14
To: user-zh@flink.apache.org 
Subject: Re:Re: Re: 1.19自定义数据源

退订

















在 2024-03-28 19:56:06,"Zhanghao Chen"  写道:
>如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。
>
>
>[1] 
>https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/
>
>Best,
>Zhanghao Chen
>
>From: ha.fen...@aisino.com 
>Sent: Thursday, March 28, 2024 15:34
>To: user-zh 
>Subject: Re: Re: 1.19自定义数据源
>
>我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?
>
>发件人: gongzhongqiang
>发送时间: 2024-03-28 15:05
>收件人: user-zh
>主题: Re: 1.19自定义数据源
>你好:
>
>当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
>版本考虑,可以将这些SourceFunction用Source重新实现。
>
>ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:
>
>>
>> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>>


Re: One query just for curiosity

2024-03-28 Thread Zhanghao Chen
Yes. However, a huge parallelism would require additional coordination cost so 
you might need to set up the JobManager with a decent spec (at least 8C 16G by 
experience). Also, you'll need to make sure there's no external bottlenecks 
(e.g. reading/writing data from the external storage).

Best,
Zhanghao Chen

From: Ganesh Walse 
Sent: Friday, March 29, 2024 10:42
To: Zhanghao Chen 
Cc: user@flink.apache.org 
Subject: Re: One query just for curiosity

You mean to say we can process 32767 records in parallel. And may I know if 
this is the case then do we need to do anything for this.

On Fri, 29 Mar 2024 at 8:08 AM, Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>> wrote:
Flink can be scaled up to a parallelism of 32767 at max. And if your record 
processing is mostly IO-bound, you can further boost the throughput via 
Async-IO [1].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

Best,
Zhanghao Chen

From: Ganesh Walse mailto:ganesh.wa...@gmail.com>>
Sent: Friday, March 29, 2024 4:48
To: user@flink.apache.org<mailto:user@flink.apache.org> 
mailto:user@flink.apache.org>>
Subject: One query just for curiosity

Hi Team,
If my 1 record gets processed in 1 second in a flink. Then what will be the 
best time taken to process 1000 records in flink using maximum parallelism.


Re: Flink cache support

2024-03-28 Thread Zhanghao Chen
Hi,

You can maintain a cache manually in your operator implementations. You can 
load the initial cached data on the operator open() method before the 
processing starts. However, this would set up a cache per task instance. If 
you'd like to have a cache shared by all processing tasks without duplication, 
you might set up a Redis service externally for that purpose.

Best,
Zhanghao Chen

From: Ganesh Walse 
Sent: Friday, March 29, 2024 4:45
To: user@flink.apache.org 
Subject: Flink cache support

Hi Team,

In my project my requirement is to cache data from the oracle database where 
the number of tables are more and the same data will be required for all the 
transactions to process.

Can you please suggest the approach where cache should be 1st loaded in memory 
then stream processing should start.

Thanks & regards,
Ganesh Walse.




Re: One query just for curiosity

2024-03-28 Thread Zhanghao Chen
Flink can be scaled up to a parallelism of 32767 at max. And if your record 
processing is mostly IO-bound, you can further boost the throughput via 
Async-IO [1].

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/

Best,
Zhanghao Chen

From: Ganesh Walse 
Sent: Friday, March 29, 2024 4:48
To: user@flink.apache.org 
Subject: One query just for curiosity

Hi Team,
If my 1 record gets processed in 1 second in a flink. Then what will be the 
best time taken to process 1000 records in flink using maximum parallelism.


Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

2024-03-28 Thread Zhanghao Chen
Congratulations!

Best,
Zhanghao Chen

From: Yu Li 
Sent: Thursday, March 28, 2024 15:55
To: d...@paimon.apache.org 
Cc: dev ; user 
Subject: Re: [ANNOUNCE] Apache Paimon is graduated to Top Level Project

CC the Flink user and dev mailing list.

Paimon originated within the Flink community, initially known as Flink
Table Store, and all our incubating mentors are members of the Flink
Project Management Committee. I am confident that the bonds of
enduring friendship and close collaboration will continue to unite the
two communities.

And congratulations all!

Best Regards,
Yu

On Wed, 27 Mar 2024 at 20:35, Guojun Li  wrote:
>
> Congratulations!
>
> Best,
> Guojun
>
> On Wed, Mar 27, 2024 at 5:24 PM wulin  wrote:
>
> > Congratulations~
> >
> > > 2024年3月27日 15:54,王刚  写道:
> > >
> > > Congratulations~
> > >
> > >> 2024年3月26日 10:25,Jingsong Li  写道:
> > >>
> > >> Hi Paimon community,
> > >>
> > >> I’m glad to announce that the ASF board has approved a resolution to
> > >> graduate Paimon into a full Top Level Project. Thanks to everyone for
> > >> your help to get to this point.
> > >>
> > >> I just created an issue to track the things we need to modify [2],
> > >> please comment on it if you feel that something is missing. You can
> > >> refer to apache documentation [1] too.
> > >>
> > >> And, we already completed the GitHub repo migration [3], please update
> > >> your local git repo to track the new repo [4].
> > >>
> > >> You can run the following command to complete the remote repo tracking
> > >> migration.
> > >>
> > >> git remote set-url origin https://github.com/apache/paimon.git
> > >>
> > >> If you have a different name, please change the 'origin' to your remote
> > name.
> > >>
> > >> Please join me in celebrating!
> > >>
> > >> [1]
> > https://incubator.apache.org/guides/transferring.html#life_after_graduation
> > >> [2] https://github.com/apache/paimon/issues/3091
> > >> [3] https://issues.apache.org/jira/browse/INFRA-25630
> > >> [4] https://github.com/apache/paimon
> > >>
> > >> Best,
> > >> Jingsong Lee
> >
> >


Re: Re: 1.19自定义数据源

2024-03-28 Thread Zhanghao Chen
如果是用的 DataStream API 的话,也可以看下新增的 DataGen Connector [1] 是否能直接满足你的测试数据生成需求。


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/

Best,
Zhanghao Chen

From: ha.fen...@aisino.com 
Sent: Thursday, March 28, 2024 15:34
To: user-zh 
Subject: Re: Re: 1.19自定义数据源

我想问的就是如果需要实现Source接口,应该怎么写,有没有具体的例子实现一个按照一定速度生成自定义的类?

发件人: gongzhongqiang
发送时间: 2024-03-28 15:05
收件人: user-zh
主题: Re: 1.19自定义数据源
你好:

当前 flink 1.19 版本只是标识为过时,在未来版本会移除 SourceFunction。所以对于你的应用而言为了支持长期 flink
版本考虑,可以将这些SourceFunction用Source重新实现。

ha.fen...@aisino.com  于2024年3月28日周四 14:18写道:

>
> 原来是继承SourceFunction实现一些简单的自动生成数据的方法,在1.19中已经标识为过期,好像是使用Source接口,这个和原来的SourceFunction完全不同,应该怎么样生成测试使用的自定义数据源呢?
>


Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Yu Chen
Congratulations!
Thanks to release managers and everyone involved!

Best,
Yu Chen
 

> 2024年3月19日 01:01,Jeyhun Karimov  写道:
> 
> Congrats!
> Thanks to release managers and everyone involved.
> 
> Regards,
> Jeyhun
> 
> On Mon, Mar 18, 2024 at 9:25 AM Lincoln Lee  wrote:
> 
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
>> 
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> 
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> 
>> The full release notes are available in Jira:
>> 
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>> 
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> 
>> 
>> Best,
>> Yun, Jing, Martijn and Lincoln
>> 



Re: [ANNOUNCE] Apache Flink 1.19.0 released

2024-03-18 Thread Yu Chen
Congratulations!
Thanks to release managers and everyone involved!

Best,
Yu Chen
 

> 2024年3月19日 01:01,Jeyhun Karimov  写道:
> 
> Congrats!
> Thanks to release managers and everyone involved.
> 
> Regards,
> Jeyhun
> 
> On Mon, Mar 18, 2024 at 9:25 AM Lincoln Lee  wrote:
> 
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink 1.19.0, which is the fisrt release for the Apache Flink 1.19 series.
>> 
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>> 
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>> 
>> Please check out the release blog post for an overview of the improvements
>> for this bugfix release:
>> 
>> https://flink.apache.org/2024/03/18/announcing-the-release-of-apache-flink-1.19/
>> 
>> The full release notes are available in Jira:
>> 
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353282
>> 
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> 
>> 
>> Best,
>> Yun, Jing, Martijn and Lincoln
>> 



Re: flink写kafka时,并行度和分区数的设置问题

2024-03-13 Thread Zhanghao Chen
你好,

写 Kafka 分区的策略取决于使用的 Kafka Sink 的 Partitioner [1],默认使用的是 Kafka 的 Default 
Partitioner,底层使用了一种称之为黏性分区的策略:对于指定 key 的数据按照对 key hash 的方式选择分区写入,对于未指定 key 
的数据则随机选择一个分区,然后“黏住”这个分区一段时间以提升攒批效果,然后攒批结束写完后再随机换一个分区,来在攒批效果和均匀写入间做一个平衡。
具体可以参考 [2]。

因此,默认配置下不存在你说的遍历导致攒批效果下降的问题,在达到 Kafka 
单分区写入瓶颈前,只是扩大写入并发就会有比较好的提升写入吞吐的效果。不过在一些特殊情况下,比如如果你并发很高,单并发写入 QPS 
极低,以致于单次攒批周期内甚至只有一两条消息,导致攒批效果差,打到 Kafka 写入瓶颈,那么降低并发可能反而能通过提升攒批效果的形式,配合写入压缩降低写入 
Kafka 流量,提升写入吞吐。

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#sink-partitioning
[2] https://www.cnblogs.com/huxi2b/p/12540092.html



From: chenyu_opensource 
Sent: Wednesday, March 13, 2024 15:25
To: user-zh@flink.apache.org 
Subject: flink写kafka时,并行度和分区数的设置问题

您好:
 flink将数据写入kafka【kafka为sink】,当kafka 
topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。
 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升?

 是否有相关的源码可以查看。
期待回复,祝好,谢谢!





Re: Facing ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig on EMR

2024-03-12 Thread Zhanghao Chen
Hi Sachin,

Flink 1.8 series have already been out of support, have you tried with a newer 
version of Flink?

From: Sachin Mittal 
Sent: Tuesday, March 12, 2024 14:48
To: user@flink.apache.org 
Subject: Facing ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig on EMR

Hi,
We have installed a flink cluster version 1.8.0 on AWS EMR.
However when we submit a job we get the following error:

(Do note that when we submit the same job on a local instance of Flink 1.8.1 it 
is working fine.
The fat jar we submit has all the flink dependencies from 1.8.0 including the 
class org.apache.flink.api.common.ExecutionConfig).


Caused by: java.lang.RuntimeException: 
org.apache.flink.runtime.client.JobInitializationException: Could not start the 
JobMaster.
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
at 
java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
at 
java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at 
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at 
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.concurrent.CompletionException: 
java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
... 3 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
... 3 more
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.api.common.ExecutionConfig
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:467)


Re: Re: Running Flink SQL in production

2024-03-08 Thread Zhanghao Chen
Hi Robin,

It's better to use Application mode [1] for mission-critical long-running SQL 
jobs as it provides better isolation, you can utilize the table API to package 
a jar as suggested by Feng to do so. Neither SQL client nor SQL gateway 
supports submitting SQL in Application mode for now, but there's some on-going 
effort [2]. Hopefully, it would be much easier to do so in the future.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/overview/#application-mode

[2] https://issues.apache.org/jira/browse/FLINK-26541

Best,
Zhanghao Chen

From: Feng Jin 
Sent: Friday, March 8, 2024 9:46
To: Xuyang 
Cc: Robin Moffatt ; user@flink.apache.org 

Subject: Re: Re: Running Flink SQL in production

Hi,

If you need to use Flink SQL in a production environment, I think it would be 
better to use the Table API [1] and package it into a jar.
Then submit the jar to the cluster environment.

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#sql

Best,
Feng

On Thu, Mar 7, 2024 at 9:56 PM Xuyang 
mailto:xyzhong...@163.com>> wrote:
Hi.
Hmm, if I'm mistaken, please correct me. Using a SQL client might not be very 
convenient for those who need to verify the
results of submissions, such as checking for exceptions related to submission 
failures, and so on.


--

Best!
Xuyang


在 2024-03-07 17:32:07,"Robin Moffatt" 
mailto:ro...@decodable.co>> 写道:

Thanks for the reply.
In terms of production, my thinking is you'll have your SQL in a file under 
code control. Whether that SQL ends up getting submitted via an invocation of 
SQL Client with -f or via REST API seems moot. WDYT?



On Thu, 7 Mar 2024 at 01:53, Xuyang 
mailto:xyzhong...@163.com>> wrote:
Hi, IMO, both the SQL Client and the Restful API can provide connections to the 
SQL Gateway service for submitting jobs. A slight difference is that the SQL 
Client also offers a command-line visual interface for users to view results.
In your production scenes, placing the SQL to be submitted into a file and then 
using the '-f' command in SQL Client to submit the file sounds a bit 
roundabout. You can just use the Restful API to submit them directly?


--

Best!
Xuyang


At 2024-03-07 04:11:01, "Robin Moffatt via user" 
mailto:user@flink.apache.org>> wrote:

I'm reading the deployment guide[1] and wanted to check my understanding. For 
deploying a SQL job into production, would the pattern be to write the SQL in a 
file that's under source control, and pass that file as an argument to SQL 
Client with -f argument (as in this docs example[2])?
Or script a call to the SQL Gateway's REST API?

Are there pros and cons to each approach?

thanks, Robin

[1]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/
[2]: 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sqlclient/#execute-sql-files


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
事实上是可行的。你可以直接修改 StreamExecutionEnvironment 的源码,默认给作业作业注册上一个你们定制的 
listener,然后通过某种那个方式把这个信息透出来。在 FLIP-314 [1] 中,我们计划直接在 Flink 里原生提供一个这样的接口让你去注册自己的 
listener 获取血缘信息,不过还没发布,可以先自己做。

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-314:+Support+Customized+Job+Lineage+Listener

From: 阿华田 
Sent: Friday, March 8, 2024 18:47
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

我们想修改源码 实现任意任务提交实时平台,初始化DAG的时候获取到血缘信息,StreamExecutionEnvironment注册 这种只能写在任务里 
不满足需求




| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年03月8日 18:23,Zhanghao Chen 写道:
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
你可以看下 OpenLineage 和 Flink 的集成方法 [1],它是在 StreamExecutionEnvironment 里注册了一个 
JobListener(通过这个可以拿到 JobClient 进而拿到 job id)。然后从 execution environment 里可以抽取到 
transformation 信息处理 [2]。

[1] https://openlineage.io/docs/integrations/flink/
[2] 
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/app/src/main/java/io/openlineage/flink/OpenLineageFlinkJobListener.java


Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 16:48
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?



 ”JobGraph 可以获得 transformation 信息“, JobGraph可以直接获取transformation的信息吗?, 我们是在
SourceTransformation 和SinkTransformation反射拿到链接信息 ,但是这个地方拿不到flinkJobid,  
JobGraph可以拿到source和sink的链接信息和flinkJobid?
| |
阿华田
|
|
a15733178...@163.com
|
 JobGraph 可以获得 transformation 信息transformation
签名由网易邮箱大师定制


在2024年03月8日 16:18,Zhanghao Chen 写道:
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: 回复: Flink DataStream 作业如何获取到作业血缘?

2024-03-08 Thread Zhanghao Chen
JobGraph 里有个字段就是 jobid。

Best,
Zhanghao Chen

From: 阿华田 
Sent: Friday, March 8, 2024 14:14
To: user-zh@flink.apache.org 
Subject: 回复: Flink DataStream 作业如何获取到作业血缘?

获取到Source 或者 DorisSink信息之后, 如何知道来自那个flink任务,好像不能获取到flinkJobId


| |
阿华田
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2024年02月26日 20:04,Feng Jin 写道:
通过 JobGraph 可以获得 transformation 信息,可以获得具体的 Source 或者 Doris
Sink,之后再通过反射获取里面的 properties 信息进行提取。

可以参考 OpenLineage[1] 的实现.


1.
https://github.com/OpenLineage/OpenLineage/blob/main/integration/flink/shared/src/main/java/io/openlineage/flink/visitor/wrapper/FlinkKafkaConsumerWrapper.java


Best,
Feng


On Mon, Feb 26, 2024 at 6:20 PM casel.chen  wrote:

一个Flink DataStream 作业从mysql cdc消费处理后写入apache
doris,请问有没有办法(从JobGraph/StreamGraph)获取到source/sink
connector信息,包括连接字符串、数据库名、表名等?


Re: flink sql关联维表在lookup执行计划中的关联条件问题

2024-03-07 Thread Yu Chen
Hi iasiuide,
方便share一下你使用的flink版本与jdbc connector的版本吗?据我所了解,jdbc 
connector在FLINK-33365[1]解决了lookup join条件丢失的相关问题。

[1] https://issues.apache.org/jira/browse/FLINK-33365

祝好~

> 2024年3月8日 11:02,iasiuide  写道:
> 
> 
> 
> 
> 图片可能加载不出来,下面是图片中的sql片段 
> ..
> END AS trans_type,
> 
>  a.div_fee_amt,
> 
>  a.ts
> 
>FROM
> 
>  ods_ymfz_prod_sys_divide_order a
> 
>  LEFT JOIN dim_ymfz_prod_sys_trans_log FOR SYSTEM_TIME AS OF a.proc_time 
> AS b ON a.bg_rel_trans_id = b.bg_rel_trans_id
> 
>  AND b.trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, 'MMdd')
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS c ON b.member_id = c.pk_id
> 
>  AND c.data_source = 'merch'
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d ON c.agent_id = d.pk_id
> 
>  AND (
> 
>d.data_source = 'ex_agent'
> 
>OR d.data_source = 'agent'
> 
>  ) 
> 
>  LEFT JOIN dim_ptfz_ymfz_merchant_info FOR SYSTEM_TIME AS OF a.proc_time 
> AS d1 ON d.fagent_id = d1.pk_id
> 
>  AND d1.data_source = 'agent'
> 
>WHERE 
> 
>  a.order_state = '2' 
> 
>  AND a.divide_fee_amt > 0
> 
>  ) dat
> 
> WHERE
> 
>  trans_date = DATE_FORMAT (CURRENT_TIMESTAMP, '-MM-dd')
> 
>  AND CHAR_LENGTH(member_id) > 1;
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2024-03-08 10:54:19,"iasiuide"  写道:
> 
> 
> 
> 
> 
> 下面的sql片段中
> ods_ymfz_prod_sys_divide_order  为kafka source表
> dim_ymfz_prod_sys_trans_log   为mysql为表
> dim_ptfz_ymfz_merchant_info   为mysql为表
> 
> 
> 
> flink web ui界面的执行计划片段如下:
> 
> [1]:TableSourceScan(table=[[default_catalog, default_database, 
> ods_ymfz_prod_sys_divide_order, watermark=[-(CASE(IS NULL(create_time), 
> 1970-01-01 00:00:00:TIMESTAMP(3), CAST(create_time AS TIMESTAMP(3))), 
> 5000:INTERVAL SECOND)]]], fields=[row_kind, id, sys_date, bg_rel_trans_id, 
> order_state, create_time, update_time, divide_fee_amt, divide_fee_flag])
> +- [2]:Calc(select=[sys_date, bg_rel_trans_id, create_time, 
> IF(SEARCH(row_kind, Sarg[_UTF-16LE'-D', _UTF-16LE'-U']), (-1 * 
> divide_fee_amt), divide_fee_amt) AS div_fee_amt, Reinterpret(CASE(create_time 
> IS NULL, 1970-01-01 00:00:00, CAST(create_time AS TIMESTAMP(3 AS ts], 
> where=[((order_state = '2') AND (divide_fee_amt  0) AND (sys_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), '-MM-dd')))])
>   +- 
> [3]:LookupJoin(table=[default_catalog.default_database.dim_ymfz_prod_sys_trans_log],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[bg_rel_trans_id=bg_rel_trans_id], where=[(trans_date = 
> DATE_FORMAT(CAST(CURRENT_TIMESTAMP() AS TIMESTAMP(9)), 'MMdd'))], 
> select=[sys_date, bg_rel_trans_id, create_time, div_fee_amt, ts, 
> bg_rel_trans_id, pay_type, member_id, mer_name])
>  +- [4]:Calc(select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name], where=[(CHAR_LENGTH(member_id)  1)])
> +- 
> [5]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id], where=[(data_source = 
> 'merch')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, pk_id, agent_id, bagent_id])
>+- [6]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, agent_id, bagent_id])
>   +- 
> [7]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], lookup=[pk_id=agent_id], 
> where=[SEARCH(data_source, Sarg[_UTF-16LE'agent', _UTF-16LE'ex_agent'])], 
> select=[sys_date, create_time, div_fee_amt, ts, pay_type, member_id, 
> mer_name, agent_id, bagent_id, pk_id, bagent_id, fagent_id])
>  +- [8]:Calc(select=[sys_date, create_time, div_fee_amt, ts, 
> pay_type, member_id, mer_name, bagent_id, bagent_id0, fagent_id AS 
> fagent_id0])
> +- 
> [9]:LookupJoin(table=[default_catalog.default_database.dim_ptfz_ymfz_merchant_info],
>  joinType=[LeftOuterJoin], async=[false], 
> lookup=[data_source=_UTF-16LE'agent', pk_id=fagent_id0], where=[(data_source 
> = 'agent')], select=[sys_date, create_time, div_fee_amt, ts, pay_type, 
> member_id, mer_name, bagent_id, bagent_id0, fagent_id0, pk_id, agent_name, 
> bagent_name])
>  
> 
> 
> 为什么关联第一张维表dim_ymfz_prod_sys_trans_log的限制条件AND b.trans_date = DATE_FORMAT 
> (CURRENT_TIMESTAMP, 'MMdd') 在执行计划中,不作为 lookup的条件 ==> 
> lookup=[bg_rel_trans_id=bg_rel_trans_id],
> 关联第二张维表 dim_ptfz_ymfz_merchant_info 的限制条件ON b.member_id = c.pk_id AND 
> c.data_source = 'merch' 在执行计划中,都是作为lookup的条件 ==> 
> lookup=[data_source=_UTF-16LE'merch', pk_id=member_id],
> 关联第三张维表dim_ptfz_ymfz_merchant_info的限制条件 ON c.agent_id = d.pk_id AND 
> (d.data_source = 'ex_agent' OR d.data_source = 'agent') 
> 中关于data_source的条件,在执行计划中不是lookup的条件 ==> lookup=[pk_id=agent_id],
> 

Re: Question about time-based operators with RocksDB backend

2024-03-04 Thread Zhanghao Chen
Hi Gabriele,

I'd recommend extending the existing window function whenever possible, as 
Flink will automatically cover state management for you and no need to be 
concerned with state backend details. Incremental aggregation for reduce state 
size is also out of the box if your usage can be satisfied with the 
reduce/aggregate function pattern, which is important for large windows.

Best,
Zhanghao Chen

From: Gabriele Mencagli 
Sent: Monday, March 4, 2024 19:38
To: user@flink.apache.org 
Subject: Question about time-based operators with RocksDB backend


Dear Flink Community,

I am using Flink with the DataStream API and operators implemented using 
RichedFunctions. I know that Flink provides a set of window-based operators 
with time-based semantics and tumbling/sliding windows.

By reading the Flink documentation, I understand that there is the possibility 
to change the memory backend utilized for storing the in-flight state of the 
operators. For example, using RocksDB for this purpose to cope with a 
larger-than-memory state. If I am not wrong, to transparently change the 
backend (e.g., from in-memory to RocksDB) we have to use a proper API to access 
the state. For example, the Keyed State API with different abstractions such as 
ValueState, ListState, etc... as reported 
here<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/fault-tolerance/state/>.

My question is related to the utilization of time-based window operators with 
the RocksDB backend. Suppose for example very large temporal windows with a 
huge number of keys in the stream. I am wondering if there is a possibility to 
use the built-in window operators of Flink (e.g., with an AggregateFunction or 
a more generic ProcessWindowFunction as 
here<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/windows/>)
 transparently with RocksDB support as a state back-end, or if I have to 
develop the window operator in a raw manner using the Keyed State API (e.g., 
ListState, AggregateState) for this purpose by implementing the underlying 
window logic manually in the code of RichedFunction of the operator (e.g., a 
FlatMap).

Thanks for your support,

--
Gabriele Mencagli


Re: 根据flink job web url可以获取到JobGraph信息么?

2024-03-03 Thread Zhanghao Chen
我在 Yanquan 的回答基础上补充下,通过 /jobs/:jobid/plan 实际上拿到的就是 JSON 表示的 JobGraph 信息(通过 
JsonPlanGenerator 这个类生成,包含了绝大部分 jobgraph 里常用的信息),应该能满足你的需要

From: casel.chen 
Sent: Saturday, March 2, 2024 14:17
To: user-zh@flink.apache.org 
Subject: 根据flink job web url可以获取到JobGraph信息么?

正在运行的flink作业能够通过其对外暴露的web url获取到JobGraph信息么?


Re: 退订

2024-02-21 Thread Zhanghao Chen
请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅来自 
user-zh@flink.apache.org
邮件组的邮件。

Best,
Zhanghao Chen

From: 曹明勤 
Sent: Thursday, February 22, 2024 9:42
To: user-zh@flink.apache.org 
Subject: 退订

退订


Re: Task Manager getting killed while executing sql queries.

2024-02-16 Thread Zhanghao Chen
Hi Kanchi,

Could you provide with more information on it? Like at what stage this log 
prints (job recovering, running, etc), any more detailed job or stacktrace.

Best,
Zhanghao Chen

From: Kanchi Masalia via user 
Sent: Friday, February 16, 2024 4:07
To: Neha Rawat 
Cc: Kenan Kılıçtepe ; user@flink.apache.org 
; Liang Mou 
Subject: Re: Task Manager getting killed while executing sql queries.

Hi!

We just encountered a similar issue.

This is usually caused by: 1) Akka failed sending the message silently, due to 
problems like oversized payload or serialization failures. In that case, you 
should find detailed error information in the logs. 2) The recipient needs more 
time for responding, due to problems like slow machines or network jitters. In 
that case, you can try to increase akka.ask.timeout.

Were you able to resolve the issue? Could you suggest what worked for you?


Thanks,
Kanchi Masalia



On Tue, Aug 29, 2023 at 6:18 AM Neha Rawat 
mailto:n.ra...@netwitness.com>> wrote:

Thanks for your response.

I did check and the memory utilization looks fine.  Attaching a VisualVM 
screenshot. Memory usage was well under the limits.

There are phases of low CPU consumption by Flink (below 10% with spikes that go 
upto 100%) and number of threads go down as well. That’s the time when I see 
Error#1. #2 and #3 as listed in the original email.



Thanks,

Neha



From: Kenan Kılıçtepe mailto:kkilict...@gmail.com>>
Sent: Monday, August 28, 2023 4:25 PM
To: Neha Rawat mailto:n.ra...@netwitness.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Task Manager getting killed while executing sql queries.



You don't often get email from 
kkilict...@gmail.com<mailto:kkilict...@gmail.com>. Learn why this is 
important<https://aka.ms/LearnAboutSenderIdentification>

CAUTION:External email. Do not click or open attachments unless you know and 
trust the sender.



Can it be a memory leak? Have you observed the memory consumption of task 
managers?

Once, task manager crush issue happened for me and it was OOM.





On Mon, Aug 28, 2023 at 9:12 PM Neha Rawat 
mailto:n.ra...@netwitness.com>> wrote:

Hi,



Need some help with the below situation. If would be great if someone could 
give some pointers on how to resolve this.



I am trying to execute 80 SQL queries on data coming in from source – Kafka 
Topic Event. The results are written to 2 sinks – Kafka topics – Alert & 
UnsortedAlert.

The input data is coming in at an EPS of about 230K Events/sec.



  *   Kafka was up and running all the time on the same host as flink.
  *   Job  configurations=>  --parallelism 20 --checkpoint-interval 60 
--aligned-checkpoint-timeout 5 --min-pause-checkpoints 5 --table-state-ttl 360
  *   Flink config -
state.backend.rocksdb.localdir: /var/netwitness/flink/rocksdb

state.backend.rocksdb.memory.write-buffer-ratio: 0.75

state.backend.rocksdb.log.level: WARN_LEVEL

state.backend.rocksdb.log.dir: /dev/null

state.backend.rocksdb.log.max-file-size: 1MB

state.backend.rocksdb.log.file-num: 1

state.backend.rocksdb.thread.num: 8

execution.checkpointing.timeout: 20 min



Have also tried the same test with flink config that has changelog enabled, 
have changed min-pause-checkpoints to 50 but the observation remains the same.



Observations-

  *   Checkpointing initially takes less than a second, but as the test 
progresses there are phases (3-4 consecutive checkpoints) where it takes more 
than a minutes and sometimes up to 9 minutes.
  *   Task manager gets killed after a couple of hours.
  *   These are the errors  in taskExecutor logs –



Error 1-

2023-08-22 22:53:59,441 INFO  org.apache.kafka.clients.NetworkClient
   [] - [Consumer clientId=Event-16, groupId=Event] Disconnecting from 
node 1 due to request timeout.

2023-08-22 22:53:59,441 INFO  org.apache.kafka.clients.NetworkClient
   [] - [Consumer clientId=Event-16, groupId=Event] Cancelled in-flight 
FETCH request with correlation id 445553 due to node 1 being disconnected 
(elapsed time since creation: 147696ms, elapsed time since send: 147696ms, 
request timeout: 3ms)

2023-08-22 22:53:59,441 INFO  org.apache.kafka.clients.NetworkClient
   [] - [Consumer clientId=Event-16, groupId=Event] Cancelled in-flight 
METADATA request with correlation id 44 due to node 1 being disconnected 
(elapsed time since creation: 1ms, elapsed time since send: 1ms, request 
timeout: 3ms)

2023-08-22 22:53:59,441 INFO  org.apache.kafka.clients.FetchSessionHandler  
   [] - [Consumer clientId=Event-16, groupId=Event] Error sending fetch 
request (sessionId=1726574973, epoch=367) to node 1:

org.apache.kafka.common.errors.DisconnectException: null

2023-08-22 22:54:06,975 INFO  org.apache.kafka.clients.NetworkClient
   [] - [Consumer clientId=Event-19, groupId=Event] Disconnecting fr

Re: Flink use case feedback request

2024-02-16 Thread Zhanghao Chen
Hi Brent,

Sounds like a good plan to start with. Application mode gives the best 
isolation level, but can be costly for large number of small jobs as at least 2 
containers are required (1 JobManager and 1 TaskManager) for each job. If your 
jobs are mostly small in size (1 or 2 TM), you might want to give a try to the 
Session Mode with the operator. Jobs can be grouped to run on a few session 
clusters to achieve higher resource utilization at the cost of lower isolation 
level and being more difficult with debugging. Running 10s on the same session 
cluster would a good choice to start with.

Best,
Zhanghao Chen

From: Brent 
Sent: Saturday, February 17, 2024 3:01
To: user@flink.apache.org 
Subject: Flink use case feedback request

Hey everyone,

I've been looking at Flink to handle a fairly complex use case and was hoping 
for some feedback on if the approach I'm thinking about with Flink seems 
reasonable.  When researching what people build on Flink, it seems like a lot 
of the focus tends to be on running fewer heavyweight/complex jobs whereas the 
approach I'm thinking about involves executing many potentially smaller and 
more lightweight jobs.

The core idea is that we have a lot (think 100s or 1000s) of incoming data 
streams (maybe via something like Apache Pulsar) and we have rules, of various 
complexities, that need to be executed against individual streams.  If a rule 
matches, an event needs to be emitted to an output stream.  The rules could be 
as simple as "In any event, if you see field X set to value 'foo', it's a 
match" or more complex like "If you see an event of type A followed by an event 
of type B followed by an event of type C in a certain time window, then it's a 
match."  These rules are long-running (could be hours, days, weeks, or longer).

It *seems* to me like Application Mode 
(https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/overview/)
 with the Kubernetes Operator 
(https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#application-deployments)
 which will create a new cluster per application seems like what I'd want. I'm 
envisioning each of these long-running rules (which potentially each read a 
different data stream) is its own job in its own application (maybe later some 
can be combined, but to start, they'll all be separate).

Does that seem like the right approach to running a number of somewhat small 
jobs concurrently on Flink?  Are there any "gotchas" to this I'm not thinking 
of? Any alternate approaches that are worth considering?  Are there any users 
we know of who do something like this currently?

Thanks for your time and insight!

~Brent


[ANNOUNCE] Apache Celeborn(incubating) 0.4.0 available

2024-02-06 Thread Fu Chen
Hi all,

Apache Celeborn(Incubating) community is glad to announce the
new release of Apache Celeborn(Incubating) 0.4.0.

Celeborn is dedicated to improving the efficiency and elasticity of
different map-reduce engines and provides an elastic, high-efficient
service for intermediate data including shuffle data, spilled data,
result data, etc.


Download Link: https://celeborn.apache.org/download/

GitHub Release Tag:

-
https://github.com/apache/incubator-celeborn/releases/tag/v0.4.0-incubating

Release Notes:

- https://celeborn.apache.org/community/release_notes/release_note_0.4.0


Home Page: https://celeborn.apache.org/

Celeborn Resources:

- Issue Management: https://issues.apache.org/jira/projects/CELEBORN
- Mailing List: d...@celeborn.apache.org

Thanks,
Fu Chen
On behalf of the Apache Celeborn(incubating) community


Re: Parallelism and Target TPS

2024-02-01 Thread Zhanghao Chen
Hi Patricia,

Flink will create one Kafka consumer per parallelism, however, you'll need some 
testing to measure the capability of a single task. Usu, one consumer can 
consume at a much higher rate than 1 record per second.

Best,
Zhanghao Chen

From: patricia lee 
Sent: Thursday, February 1, 2024 15:26
To: user@flink.apache.org 
Subject: Parallelism and Target TPS

Hi,

I have a flink job that consumes from kafka and sinks it to an API. I need to 
ensure that my flink job can send within the rate limit 200 tps, we are 
planning to increase the parallelism, but I do not know the right number to 
set. 1 parallelism  does equal to 1 consumer? So if 200, should we set it to 
200 parallelism too?

I only created a simple retry that will send it again if 421 http error is 
received.


Any advise, appreciated. Thanks


Re: Apache Flink lifecycle and Java 17 support

2024-01-28 Thread Zhanghao Chen
Hi Deepti,

Regarding the life cycle for 1.x versions, 1.x will receive its last bugfix 
version upon the release of 1.(x+2), e.g, 1.16 will receive its last bugfix 
version upon the release of 1.18. Given that Flink release a minor version 
every 6 months, this means each minor version will receive bugfixes for about 1 
year.

One possible exception is the last version of the 1.x series. There has been 
discussion on making it an LTS version, but no consensus has been reached yet. 
Most likely 1.20 will be the last version of 1.x series and would be an LTS 
version.

For Java 17 support, it is supported in Flink 1.18 but annotated as 
"experimental" so far. I'm not sure on the meaning of "experimental" as well. 
My personal understanding is that it lacks massive production adoption yet, 
hence "experimental". But a few companies have already adopted Java 17 in 
production, including ByteDance as mentioned by Xiangyu.

Best,
Zhanghao Chen

From: Deepti Sharma S via user 
Sent: Friday, January 26, 2024 22:56
To: xiangyu feng 
Cc: user@flink.apache.org 
Subject: RE: Apache Flink lifecycle and Java 17 support


Hello Xiangyu,



Sorry but I am still not clear the life cycle details for 1.x versions?



Do we have any defined end of life dates for 1.16, 1.17 and 1.19? Also is 1.18 
will be the last minor release for 1.x series?



Also what do you mean by experimental support of Java 17? Will Java17 is 
supported by Flink community with Flink 1.18? Can we use this combination in 
our commercial product release?





Regards,

Deepti Sharma




From: xiangyu feng 
Sent: 24 January 2024 18:11
To: Deepti Sharma S 
Cc: user@flink.apache.org
Subject: Re: Apache Flink lifecycle and Java 17 support



Hi Deepti,



For the lifecycle of Flink 1.x version and more information about Flink 2.0, 
pls refer to Flink RoadMap[1] and this discussion thread[2].

Flink 1.18 currently has experimental support for Java17[3].



[1] https://flink.apache.org/what-is-flink/roadmap/

[2] https://lists.apache.org/thread/b8w5cx0qqbwzzklyn5xxf54vw9ymys1c

[3] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/java_compatibility/



Regards,

Xiangyu Feng



Deepti Sharma S via user mailto:user@flink.apache.org>> 
于2024年1月24日周三 20:22写道:

Hello Team,



Can you please let me know the lifecycle for Flink 1.x versions? Also does any 
version supports Java17?





Regards,

Deepti Sharma




Re: Flink autoscaler scaling report

2024-01-18 Thread Yu Chen
Hi Yang,

You can run `StandaloneAutoscalerEntrypoint`, and the scale report will print 
in log (info level) by LoggingEventHandler[2].

[1] 
flink-kubernetes-operator/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java
 at main · apache/flink-kubernetes-operator (github.com) 
<https://github.com/apache/flink-kubernetes-operator/blob/main/flink-autoscaler-standalone/src/main/java/org/apache/flink/autoscaler/standalone/StandaloneAutoscalerEntrypoint.java>
[2] 
https://github.com/apache/flink-kubernetes-operator/blob/48df9d35ed55ae8bb513d9153e9f6f668da9e1c3/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/LoggingEventHandler.java#L43C18-L43C18

Best,
Yu Chen


> 2024年1月18日 18:20,Yang LI  写道:
> 
> Hello dear flink community,
> 
> I noticed that there's a scaling report feature (specifically, the strings
> defined in AutoscalerEventHandler) in the Flink operator autoscaler.
> However, I'm unable to find this information in the Flink operator logs.
> Could anyone guide me on how to access or visualize this scaling report?
> 
> Thanks,
> Yang



Re: How to monitor changes in the existing files using flink 1.17.2

2024-01-09 Thread Yu Chen
Hi Nitin,

In the Flink file system connector, a collection of file paths is used by flink 
to identify whether a file has been processed or not in the state[1]. 
So, if your file path has not been updated but the content has been updated, it 
will not be reprocessed in that case.

Meanwhile, here is another question, should we reprocess all the content in a 
file when it has been updated?
If your answer is Yes, then the easiest way is to have upstream write the data 
to a new file.

If your answer is No, then I think you need a data source that can support 
incremental reading, perhaps consider Apache Paimon's similar datalake format 
alternative to pure files to provide you with more ability to manipulate the 
data.

[1] 
https://github.com/apache/flink/blob/cad090aaed770c90facb6edbcce57dd341449a02/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/ContinuousFileSplitEnumerator.java#L62C33-L62C54
[2] Overview | Apache Paimon 
<https://paimon.apache.org/docs/master/concepts/overview/>

Best,
Yu Chen


> 2024年1月10日 04:31,Nitin Saini  写道:
> 
> Hi Flink Community,
> 
> I was using flink 1.12.7 readFile to read files from the s3 it was able to 
> monitor if there are new files added or updation in the existing files as 
> well.
> 
> But now I have migrated to flink 1.17.2 and using FileSource to read files 
> from s3 it was able to monitor if new files are being added to s3 but not 
> able to monitor changes in the existing files.
> 
> Is there any way in flink 1.17.2 through which we can achieve that 
> functionality as well, i.e. we are also able to monitor the changes in the 
> existing files as well. By overriding some classes or by doing something else.
> 
> Thanks & Regards,
> Nitin Saini



Re: 滑动窗口按照处理时间触发的问题

2024-01-02 Thread Jinsui Chen
Hi,

请问是否可以将所有代码贴出来,尤其是水位线相关的。因为事件时间的推进和水位线策略紧密相关。

假设这样一种情况,将时间戳作为事件时间,假设你的水位线容错间隔设置为10min,就会出现上述情况,原因如下:
1. 首先是时间窗口的对齐逻辑。窗口是根据 Epoch 时间(1970-01-01 00:00:00
UTC)来对齐的。例如,如果窗口大小为5分钟,那么窗口的开始时间会是00:00、00:05、00:10等等很整的值,而不是事件时间。这也是为什么你的第一条数据会落在
00:20 - 01:20 这个时间窗口上。
2. 对于事件时间窗口,触发窗口计算的时机是‘水位线大于窗口结束时间’,也就是需要一条事件时间在 01:30 后的数据才会触发 00:20 -
01:20 这个窗口。如果想要触发 1704129661000 对应的 00:25 - 01:25 窗口,需要一条事件时间大于 01:35
的数据,而现实是没有这样的一条数据。

我猜测你的水位线容错间隔是10min-14min 的值,导致了上述情况。

Best regards,
Jinsui

ha.fen...@aisino.com  于2024年1月2日周二 20:17写道:

>
> 程序是一个滑动窗口SlidingEventTimeWindows.of(Time.minutes(60),Time.minutes(5)),
> 处理函数aggregate(new AggregateFunctionMethod(), new MyProcess())。
> 我使用socket方式进行录入数据
> 2024-01-02 01:19:01  1704129541000
> 2024-01-02 01:21:01  1704129661000
> 2024-01-02 01:26:01  1704129961000
> 2024-01-02 01:29:01  1704130141000
> 2024-01-02 01:34:01  1704130441000
> 前面是对应的时间,后面是我录入系统的时间
> MyProcess类触发的时间是最后一次录入1704130441000的时候,输出窗口时间为
> 2024-01-02 00:20:00.000,2024-01-02 01:20:00.000
> 我认为应该录入1704129661000的时候就应该触发窗口函数了,但是并没有,所以我想问5分钟触发窗口的时间到底是怎么回事?
>


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Chen Yu
Hi  Chen,
You should tell flink which table to insert by “INSERT INTO XXX SELECT XXX”.

For single non insert query, flink will collect output to the console 
automatically. Therefore, you don’t need to add insert also works.

But you must point out target table specifically when you need to write data to 
external storage.

Like,

String relateQuery = "insert into xxx select correlator_id , name, relationship 
from Correlation; ;


Best,
Yu Chen

获取 Outlook for iOS<https://aka.ms/o0ukef>

发件人: Zhanghao Chen 
发送时间: Wednesday, December 6, 2023 7:21:50 PM
收件人: elakiya udhayanan ; user@flink.apache.org 

主题: Re: Query on using two sinks for a Flink job (Flink SQL)

Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Re: Query on using two sinks for a Flink job (Flink SQL)

2023-12-06 Thread Zhanghao Chen
Hi Elakiya,

You can try executing TableEnvironmentImpl#executeInternal for non-insert 
statements, then using StatementSet.addInsertSql to add multiple insertion 
statetments, and finally calling StatementSet#execute.

Best,
Zhanghao Chen

From: elakiya udhayanan 
Sent: Wednesday, December 6, 2023 17:49
To: user@flink.apache.org 
Subject: Query on using two sinks for a Flink job (Flink SQL)

Hi Team,
 I would like to know the possibility of having two sinks in a single Flink 
job. In my case I am using the Flink SQL based job where I try to consume from 
two different Kafka topics using the create table (as below) DDL and then use a 
join condition to correlate them and at present write it to an external 
database (PostgreSQL - as a sink). I would like to know if I can add another 
sink where I want to also write it to kafka topic (as the second sink).
I tried using two sql scripts (two create and two insert for the same) but was 
facing an exception "Cannot have more than one execute() or executeAsync() call 
in a single environment. at "
Also tried to use the StatementSet functionality which again gave me an 
exception "org.apache.flink.table.api.TableException: Only insert statement is 
supported now. at ".
I am looking for some help in regards to this. TIA

Note: I am using the Flink UI to submit my job.
Sample DDL statement used:
String statement = "CREATE TABLE Person (\r\n" +
"  person ROW(id STRING, name STRING\r\n" +
"  ),\r\n" +
"  PRIMARY KEY (id) NOT ENFORCED\r\n" +
") WITH (\r\n" +
"  'connector' = 'upsert-kafka',\r\n" +
"  'topic' = 'employee',\r\n" +
"  'properties.bootstrap.servers' = 'kafka-cp-kafka:9092',\r\n" +
"  'key.format' = 'raw',\r\n" +
"  'value.format' = 'avro-confluent',\r\n" +
"  'value.avro-confluent.url' = 'http://kafka-cp-schema-registry:8081',\r\n" +
")";

Thanks,
Elakiya


Re: Flink Kubernetes HA

2023-12-06 Thread Zhanghao Chen
Hi Ethan,

Pekko is basically a fork of Akka before its license change, so the usage is 
almost the same. From the exception posted, it looks like you are trying to 
connector to a terminated dispatcher, which usually indicates some exceptions 
on the JobManager side. You can try checking the JM log to find more clues.

"-cluster-config-map" is introduced by the single-leader election change of 
HA. In 1.13, each subcomponent of the jobmanager: rest endpoint, dispatcher, 
resourcemanager and jobmasters do leader election separately. In higher 
versions, a single leader election is performed with a unified config map for 
doing that.

Best,
Zhanghao Chen

From: Ethan T Yang 
Sent: Wednesday, December 6, 2023 5:40
To: user@flink.apache.org 
Subject: Flink Kubernetes HA

Hi Flink users,
After upgrading Flink ( from 1.13.1 -> 1.18.0), I noticed the an issue when HA 
is enabled.( see exception below). I am using k8s deployment and I clean the 
previous configmaps, like leader files etc. I know the pekko is a recently 
thing. Can someone share doc on how to use or set it? When I disable HA, the 
deployment was successful. I also noticed a new configmap called 
“-cluster-config-map”, can someone provide reference on what it is for? I 
don’t see it in the 1.13.1 version.

Thanks a lot
Ivan



org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could 
not send message 
[LocalRpcInvocation(RestfulGateway.requestMultipleJobDetails(Time))] from 
sender [unknown] to recipient [pe

kko.tcp://flink@flink-secondary-jobmanager:6123/user/rpc/dispatcher_1], because 
the recipient is unreachable. This can either mean that the recipient has been 
terminated or that the remote RpcService i

s currently not reachable.

at com.sun.proxy.$Proxy55.requestMultipleJobDetails(Unknown Source) ~[?:?]

at 
org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler.handleRequest(JobsOverviewHandler.java:65)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
 ~[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
 ~[flink-dist-1.18.0.jar:1.18.0]

at java.util.Optional.ifPresent(Unknown Source) [?:?]

at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 [flink-dist-1.18.0.jar:1.18.0]

at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 [flink-dist-1.18

Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-12-05 Thread Mason Chen
Hi Rui,

Sorry for the late reply. I was suggesting that perhaps we could do some
testing with Kubernetes wrt configuring values for the exponential restart
strategy. We've noticed that the default strategy in 1.17 caused a lot of
requests to the K8s API server for unstable deployments.

However, people in different Kubernetes setups will have different limits
so it would be challenging to provide a general benchmark. Another thing I
found helpful in the past is to refer to Kubernetes--for example, the
default strategy is exponential for pod restarts and we could draw
inspiration from what they have set as a general purpose default config.

Best,
Mason

On Sun, Nov 19, 2023 at 9:43 PM Rui Fan <1996fan...@gmail.com> wrote:

> Hi David and Mason,
>
> Thanks for your feedback!
>
> To David:
>
> > Given that the new default feels more complex than the current behavior,
> if we decide to do this I think it will be important to include the
> rationale you've shared in the documentation.
>
> Sounds make sense to me, I will add the related doc if we
> update the default strategy.
>
> To Mason:
>
> > I suppose we could do some benchmarking on what works well for the
> resource providers that Flink relies on e.g. Kubernetes. Based on
> conferences and blogs,
> > it seems most people are relying on Kubernetes to deploy Flink and the
> restart strategy has a large dependency on how well Kubernetes can scale to
> requests to redeploy the job.
>
> Sorry, I didn't understand what type of benchmarking
> we should do, could you elaborate on it? Thanks a lot.
>
> Best,
> Rui
>
> On Sat, Nov 18, 2023 at 3:32 AM Mason Chen  wrote:
>
>> Hi Rui,
>>
>> I suppose we could do some benchmarking on what works well for the
>> resource providers that Flink relies on e.g. Kubernetes. Based on
>> conferences and blogs, it seems most people are relying on Kubernetes to
>> deploy Flink and the restart strategy has a large dependency on how well
>> Kubernetes can scale to requests to redeploy the job.
>>
>> Best,
>> Mason
>>
>> On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
>> wrote:
>>
>>> Rui,
>>>
>>> I don't have any direct experience with this topic, but given the
>>> motivation you shared, the proposal makes sense to me. Given that the new
>>> default feels more complex than the current behavior, if we decide to do
>>> this I think it will be important to include the rationale you've shared in
>>> the documentation.
>>>
>>> David
>>>
>>> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>>>
>>>> Hi dear flink users and devs:
>>>>
>>>> FLIP-364[1] intends to make some improvements to restart-strategy
>>>> and discuss updating some of the default values of exponential-delay,
>>>> and whether exponential-delay can be used as the default
>>>> restart-strategy.
>>>> After discussing at dev mail list[2], we hope to collect more feedback
>>>> from Flink users.
>>>>
>>>> # Why does the default restart-strategy need to be updated?
>>>>
>>>> If checkpointing is enabled, the default value is fixed-delay with
>>>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>>>> the job will restart infinitely with high frequency when a job
>>>> continues to fail.
>>>>
>>>> When the Kafka cluster fails, a large number of flink jobs will be
>>>> restarted frequently. After the kafka cluster is recovered, a large
>>>> number of high-frequency restarts of flink jobs may cause the
>>>> kafka cluster to avalanche again.
>>>>
>>>> Considering the exponential-delay as the default strategy with
>>>> a couple of reasons:
>>>>
>>>> - The exponential-delay can reduce the restart frequency when
>>>>   a job continues to fail.
>>>> - It can restart a job quickly when a job fails occasionally.
>>>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>>>   estarting multiple jobs at the same time. It’s useful to prevent
>>>>   avalanches.
>>>>
>>>> # What are the current default values[4] of exponential-delay?
>>>>
>>>> restart-strategy.exponential-delay.initial-backoff : 1s
>>>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>>>> restart-strategy.exponential-delay.jitter-factor : 0.1
>>>> restart-strategy.exponential-delay.max-backoff : 5 min
>>>> restart-strat

Re: oomkill issue

2023-12-04 Thread Yu Chen
Hi Prashant,
Can you describe the steps you use to run `jeprof` in detail?

In my case, I did it by logging in to Taskmanager's shell command line and then 
operating it through shell commands. But I am confused that I saw the curl 
operation in the error log you provided.

Also, it is true that rocksdb is not perfect in memory control and it is 
possible to exceed the managed memory limit, you can refer to the 
documentation[1] for more details.

[1] Write buffer manager internals - Google 文档 
<https://docs.google.com/document/d/1_4Brwy2Axzzqu7SJ4hLLl92hVCpeRlVEG-fj8KsTMUo/edit#heading=h.f5wfmsmpemd0>

Best,
Yu Chen

> 2023年12月5日 04:42,prashant parbhane  写道:
> 
> Hi Yu,
> 
> Thanks for your reply.
> 
> When i run below script
> 
> ```
> jeprof --show_bytes -svg `which java` /tmp/jeprof.out.301.1009.i1009.heap  > 
> 1009.svg
> ```
> i am getting below error
> 
> ```
> Gathering CPU profile from http:///pprof/profile?seconds=30 for 30 seconds to
>   /root/jeprof/java.1701718686.
> Be patient...
> Failed to get profile: curl -s --fail --max-time 90 
> 'http:///pprof/profile?seconds=30' > /root/jeprof/.tmp.java.1701718686.: No 
> such file or directory
> ```
> Any input on this?
> 
> However, oomkill was resolve with below rocksdb configurations
> • "state.backend.rocksdb.memory.managed": "false", 
> "state.backend.rocksdb.block.cache-size": "10m", 
> "state.backend.rocksdb.writebuffer.size": "128m",
> "state.backend.rocksdb.writebuffer.count": "134217728"
> "state.backend.rocksdb.ttl.compaction.filter.enabled":"true"
> 
> 
> Thanks,
> Prashant
> 
> On Mon, Nov 27, 2023 at 7:11 PM Xuyang  wrote:
> Hi, Prashant.
> I think Yu Chen has given a professional troubleshooting ideas. Another thing 
> I want to ask is whether you use some 
> user defined function to store some objects? You can firstly dump the memory 
> and get more details to check for memory leaks.
> 
> --
> Best!
> Xuyang
> 在 2023-11-28 09:12:01,"Yu Chen"  写道:
> Hi Prashant,
> 
> OOMkill was mostly caused by workset memory exceed the pod limit. 
> We have to first expand the OVERHEAD memory properly by the following params 
> to observe if the problem can be solved.
> ```
> taskmanager.memory.jvm-overhead.max=1536m
> taskmanager.memory.jvm-overhead.min=1536m
> ```
> 
> And if the OOMKill still exists, we need to suspect if the task has an 
> off-heap memory leak.
> One of the most popular tools, jemallc, is recommended. You have to install 
> the jemalloc in the image arrording to the document[1].
> After that, you can enable jemalloc profiling by setting environment for the 
> taskmanager:
> ```
> containerized.taskmanager.env.MALLOC_CONF=prof:true,lg_prof_interval:30,lg_prof_sample:16,prof_prefix:/tmp/jeprof.out
>   ```
> After running for a while, you can log into the Taskmanager to generate svg 
> files to troubleshoot off-heap memory distribution.
> ```
> jeprof --show_bytes -svg `which java` /tmp/jeprof.out.301.1009.i1009.heap  > 
> 1009.svg
> ```
> 
> Otherwise, if the OOMKill no longer occurs, but the GC overhead limit 
> exceeded, then you should dump heap memory to find out what objects are 
> taking up so much of the memory.
> Here is the command for you.
> ```
> jmap -dump:live,format=b,file=/tmp/heap.hprof 
> ```
> 
> [1] Using jemalloc to Optimize Memory Allocation — Sentieon Appnotes 
> 202308.01 documentation
> 
> Best,
> Yu Chen
> 发件人: prashant parbhane 
> 发送时间: 2023年11月28日 1:42
> 收件人: user@flink.apache.org 
> 主题: oomkill issue   Hello,  
> 
> We have been facing this oomkill issue, where task managers are getting 
> restarted with this error.
> I am seeing memory consumption increasing in a linear manner, i have given 
> memory and CPU as high as possible but still facing the same issue.
> 
> We are using rocksdb for the state backend, is there a way to find which 
> operator causing this issue? or find which operator takes more memory? Any 
> good practice that we can follow? We are using broadcast state.
> 
> Thanks,
> Prashant



回复: oomkill issue

2023-11-27 Thread Yu Chen
Hi Prashant,

OOMkill was mostly caused by workset memory exceed the pod limit.
We have to first expand the OVERHEAD memory properly by the following params to 
observe if the problem can be solved.
```
taskmanager.memory.jvm-overhead.max=1536m
taskmanager.memory.jvm-overhead.min=1536m
```

And if the OOMKill still exists, we need to suspect if the task has an off-heap 
memory leak.
One of the most popular tools, jemallc, is recommended. You have to install the 
jemalloc in the image arrording to the document[1].
After that, you can enable jemalloc profiling by setting environment for the 
taskmanager:
```

containerized.taskmanager.env.MALLOC_CONF=prof:true,lg_prof_interval:30,lg_prof_sample:16,prof_prefix:/tmp/jeprof.out

```
After running for a while, you can log into the Taskmanager to generate svg 
files to troubleshoot off-heap memory distribution.
```
jeprof --show_bytes -svg `which java` /tmp/jeprof.out.301.1009.i1009.heap  > 
1009.svg
```

Otherwise, if the OOMKill no longer occurs, but the GC overhead limit exceeded, 
then you should dump heap memory to find out what objects are taking up so much 
of the memory.
Here is the command for you.
```
jmap -dump:live,format=b,file=/tmp/heap.hprof 
```

[1] Using jemalloc to Optimize Memory Allocation ― Sentieon Appnotes 202308.01 
documentation<https://support.sentieon.com/appnotes/jemalloc/>

Best,
Yu Chen

发件人: prashant parbhane 
发送时间: 2023年11月28日 1:42
收件人: user@flink.apache.org 
主题: oomkill issue

Hello,

We have been facing this oomkill issue, where task managers are getting 
restarted with this error.
I am seeing memory consumption increasing in a linear manner, i have given 
memory and CPU as high as possible but still facing the same issue.

We are using rocksdb for the state backend, is there a way to find which 
operator causing this issue? or find which operator takes more memory? Any good 
practice that we can follow? We are using broadcast state.

Thanks,
Prashant


Re: Flink 1.18 and java 17

2023-11-27 Thread Zhanghao Chen
Hi Lasse,

The default flink-conf.yaml file bundled in the distribution should already 
have a preset env.java.opts.all config for Java 17. Have you tried that?

Best,
Zhanghao Chen

From: Lasse Nedergaard 
Sent: Monday, November 27, 2023 21:20
To: user 
Subject: Flink 1.18 and java 17

Hi

I need some help to figure out how to get Flink 1.18 running on Java 17

According to the documentation for java compatibility I have to set 
env.java.opts.all. As I use data types and generic list and maps from JDK.

I need to configure it so it works for both tests using a mini cluster and our 
fat jar i application mode.

I can’t find any information how to do the configuration and what the 
configuration should be.

If anyone can help me here it would be much appreciated.

Med venlig hilsen / Best regards
Lasse Nedergaard



回复: Operator ids

2023-11-26 Thread Yu Chen
Hi rania,

Through the following REST APIs, you can get the vertex metrics(chained 
operator).

GET http://localhost:8081/jobs//vertices/
Note that vertex_id can be accessed from GET http://localhost:8081/jobs/

However, there is no interface for getting operator-level metrics.
But I was planning to add such interface, you can follow the ticket 
FLINK-33230[1]

[1] [FLINK-33230] Support Expanding ExecutionGraph to StreamGraph in Web UI - 
ASF JIRA (apache.org)<https://issues.apache.org/jira/browse/FLINK-33230>

Best,
Yu Chen

发件人: rania duni 
发送时间: 2023年11月27日 0:25
收件人: Yu Chen 
主题: Re: Operator ids

Thank you for answering! I want the operator ids to get the metric “records 
Out” in case data are split in a task. I develop, for my thesis, a scaling 
algorithm, so I needed this metric.

26 Νοε 2023, 3:01 μμ, ο χρήστης «Yu Chen » έγραψε:


Hi rania,

If you means the Job Vertex ID of the JobGraph, you can try this:
http://localhost:8081/jobs/

Best,
Yu Chen

发件人: Zhanghao Chen 
发送时间: 2023年11月26日 11:02
收件人: rania duni ; user@flink.apache.org 

主题: Re: Operator ids

It is not supported yet. Curious why do you need to get the operator IDs? They 
are usually only used internally.

Best,
Zhanghao Chen

From: rania duni 
Sent: Saturday, November 25, 2023 20:44
To: user@flink.apache.org 
Subject: Operator ids

Hello!

I would like to know how can I get the operator ids of a running job. I know 
how can I get the task ids but I want the operator ids! I couldn’t find 
something to the REST API docs.
Thank you.


回复: Doubts about state and table API

2023-11-26 Thread Yu Chen
Hi Oscar,

The Operator ID of the SQL job was generated by `StreamingJobGraphGenerator`, 
it was releated with the topology of the stream graph.
If you would like to confirm that the problem was caused by the changes of 
opearator id or not, please remove --allowNonRestoredState, and you will get 
the exception of the failed restore operator id.

However, the lost of the operator state would only produce some erroneous 
results and would not result in `not able to return any row`. It would be 
better to provide logs after restoring to locate a more specific problem.

Best,
Yu Chen

发件人: Oscar Perez via user 
发送时间: 2023年11月25日 0:08
收件人: Oscar Perez via user 
主题: Doubts about state and table API

Hi,

We are having a job in production where we use table API to join multiple 
topics. The query looks like this:


SELECT *
FROM topic1 AS t1
JOIN topic2 AS t2 ON t1.userId = t2.userId
JOIN topic3 AS t3 ON t1.userId = t3.accountUserId


This works and produces an EnrichedActivity any time any of the topics receives 
a new event, which is what we expect. This SQL query is linked to a processor 
function and the processElement gets triggered whenever a new EnrichedActivity 
occurs

We have experienced an issue a couple of times in production where we have 
deployed a new version from savepoint and then suddenly we stopped receiving 
EnrichedActivities in the process function.

Our assumption is that this is related to the table API state and that some 
operators are lost from going from one savepoint to new deployment.

Let me illustrate with one example:

version A of the job is deployed
version B of the job is deployed

version B UID for some table api operators changes and this operator is removed 
when deploying version B as it is unable to be mapped (we have the 
--allowNonRestoredState enabled)

The state for the table api stores bot the committed offset and the contents of 
the topic but just the contents are lost and the committed offset is still in 
the offset

Therefore, when doing the join of the query, it is not able to return any row 
as it is unable to get data from topic2 or topic 3.

Can this be the case?
We are having a hard time trying to understand how the table api and state 
works internally so any help in this regard would be truly helpful!

Thanks,
Oscar




回复: Operator ids

2023-11-26 Thread Yu Chen
Hi rania,

If you means the Job Vertex ID of the JobGraph, you can try this:
http://localhost:8081/jobs/

Best,
Yu Chen

发件人: Zhanghao Chen 
发送时间: 2023年11月26日 11:02
收件人: rania duni ; user@flink.apache.org 

主题: Re: Operator ids

It is not supported yet. Curious why do you need to get the operator IDs? They 
are usually only used internally.

Best,
Zhanghao Chen

From: rania duni 
Sent: Saturday, November 25, 2023 20:44
To: user@flink.apache.org 
Subject: Operator ids

Hello!

I would like to know how can I get the operator ids of a running job. I know 
how can I get the task ids but I want the operator ids! I couldn’t find 
something to the REST API docs.
Thank you.


Re: 退订

2023-11-25 Thread Zhanghao Chen
你好,

请发送任意内容的邮件到 user-zh-unsubscr...@flink.apache.org 来取消订阅邮件。

Best,
Zhanghao Chen

From: 唐凯 
Sent: Saturday, November 25, 2023 9:23
To: user-zh 
Subject: 退订

退订






Re: Operator ids

2023-11-25 Thread Zhanghao Chen
It is not supported yet. Curious why do you need to get the operator IDs? They 
are usually only used internally.

Best,
Zhanghao Chen

From: rania duni 
Sent: Saturday, November 25, 2023 20:44
To: user@flink.apache.org 
Subject: Operator ids

Hello!

I would like to know how can I get the operator ids of a running job. I know 
how can I get the task ids but I want the operator ids! I couldn’t find 
something to the REST API docs.
Thank you.


Re: flink sql作业如何支持配置流?

2023-11-20 Thread Yu Chen
Hi casel,

我们在生产中有类似的做法,可以考虑实现一个udtf,监听apollo的配置,根据配置选择是否filter数据。

Best,
Yu Chen


> 2023年11月20日 21:05,Xuyang  写道:
> 
> Hi, 
>是否可以将这个”配置维表“换成流表,利用flink cdc,改动这个配置表的时候,监听字段cdc变化,同时下游上流join呢?
> 
> 
> 
> 
> --
> 
>Best!
>Xuyang
> 
> 
> 
> 
> 
> 在 2023-11-20 19:24:47,"casel.chen"  写道:
>> 我有一个flink 
>> sql作业过滤条件customer_id是需要根据用户配置来定的,类似于Apollo配置中心,是否可以通过定义一张配置维表来实现呢?设置TTL定期去获取最新配置。
>> 
>> 
>> create table customer_conf_tbl (
>> customer_id STRING
>> ) with (
>> 'connector' = 'apollo',
>> '其他属性' 
>> );
>> select * from biz_table where customer_id in (select 
>> string_split(customer_id, ',') from customer_conf_tbl)
>> 
>> 
>> 如果要做成配置实时更新作用于sql作业的话又该如何实现呢?



Re: [DISCUSS] Change the default restart-strategy to exponential-delay

2023-11-17 Thread Mason Chen
Hi Rui,

I suppose we could do some benchmarking on what works well for the resource
providers that Flink relies on e.g. Kubernetes. Based on conferences and
blogs, it seems most people are relying on Kubernetes to deploy Flink and
the restart strategy has a large dependency on how well Kubernetes can
scale to requests to redeploy the job.

Best,
Mason

On Fri, Nov 17, 2023 at 10:07 AM David Anderson 
wrote:

> Rui,
>
> I don't have any direct experience with this topic, but given the
> motivation you shared, the proposal makes sense to me. Given that the new
> default feels more complex than the current behavior, if we decide to do
> this I think it will be important to include the rationale you've shared in
> the documentation.
>
> David
>
> On Wed, Nov 15, 2023 at 10:17 PM Rui Fan <1996fan...@gmail.com> wrote:
>
>> Hi dear flink users and devs:
>>
>> FLIP-364[1] intends to make some improvements to restart-strategy
>> and discuss updating some of the default values of exponential-delay,
>> and whether exponential-delay can be used as the default restart-strategy.
>> After discussing at dev mail list[2], we hope to collect more feedback
>> from Flink users.
>>
>> # Why does the default restart-strategy need to be updated?
>>
>> If checkpointing is enabled, the default value is fixed-delay with
>> Integer.MAX_VALUE restart attempts and '1 s' delay[3]. It means
>> the job will restart infinitely with high frequency when a job
>> continues to fail.
>>
>> When the Kafka cluster fails, a large number of flink jobs will be
>> restarted frequently. After the kafka cluster is recovered, a large
>> number of high-frequency restarts of flink jobs may cause the
>> kafka cluster to avalanche again.
>>
>> Considering the exponential-delay as the default strategy with
>> a couple of reasons:
>>
>> - The exponential-delay can reduce the restart frequency when
>>   a job continues to fail.
>> - It can restart a job quickly when a job fails occasionally.
>> - The restart-strategy.exponential-delay.jitter-factor can avoid r
>>   estarting multiple jobs at the same time. It’s useful to prevent
>>   avalanches.
>>
>> # What are the current default values[4] of exponential-delay?
>>
>> restart-strategy.exponential-delay.initial-backoff : 1s
>> restart-strategy.exponential-delay.backoff-multiplier : 2.0
>> restart-strategy.exponential-delay.jitter-factor : 0.1
>> restart-strategy.exponential-delay.max-backoff : 5 min
>> restart-strategy.exponential-delay.reset-backoff-threshold : 1h
>>
>> backoff-multiplier=2 means that the delay time of each restart
>> will be doubled. The delay times are:
>> 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 300s, 300s, etc.
>>
>> The delay time is increased rapidly, it will affect the recover
>> time for flink jobs.
>>
>> # Option improvements
>>
>> We think the backoff-multiplier between 1 and 2 is more sensible,
>> such as:
>>
>> restart-strategy.exponential-delay.backoff-multiplier : 1.2
>> restart-strategy.exponential-delay.max-backoff : 1 min
>>
>> After updating, the delay times are:
>>
>> 1s, 1.2s, 1.44s, 1.728s, 2.073s, 2.488s, 2.985s, 3.583s, 4.299s,
>> 5.159s, 6.191s, 7.430s, 8.916s, 10.699s, 12.839s, 15.407s, 18.488s,
>> 22.186s, 26.623s, 31.948s, 38.337s, etc
>>
>> They achieve the following goals:
>> - When restarts are infrequent in a short period of time, flink can
>>   quickly restart the job. (For example: the retry delay time when
>>   restarting 5 times is 2.073s)
>> - When restarting frequently in a short period of time, flink can
>>   slightly reduce the restart frequency to prevent avalanches.
>>   (For example: the retry delay time when retrying 10 times is 5.1 s,
>>   and the retry delay time when retrying 20 times is 38s, which is not
>> very
>> large.)
>>
>> As @Mingliang Liu   mentioned at dev mail list: the
>> one-size-fits-all
>> default values do not exist. So our goal is that the default values
>> can be suitable for most jobs.
>>
>> Looking forward to your thoughts and feedback, thanks~
>>
>> [1] https://cwiki.apache.org/confluence/x/uJqzDw
>> [2] https://lists.apache.org/thread/5cgrft73kgkzkgjozf9zfk0w2oj7rjym
>> [3]
>>
>> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/config/#restart-strategy-type
>> [4]
>>
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#exponential-delay-restart-strategy
>>
>> Best,
>> Rui
>>
>


Re: Increasing maximum number of FlinkDeployments that the Operator can handle

2023-11-08 Thread Tony Chen
Currently, 16GB of heap size is allocated to the flink-kubernetes-operator
container by setting *jvmArgs.operator*, and this didn't help either.

On Wed, Nov 8, 2023 at 5:56 PM Tony Chen  wrote:

> Hi Flink Community,
>
> This is a follow-up on a previous email thread (see email thread below).
> After changing the number of operator pods to 1, although we didn't
> encounter the multiple leaders issue anymore, our singleton operator pod
> restarts whenever we have 150+ FlinkDeployments. Sometimes, the operator
> pod would be stuck CrashLoopBackOff.
>
> We changed the parameter kubernetes.operator.reconcile.parallelism to -1,
> but this didn't help. Are there other parameters (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/configuration/#system-configuration)
> we can tune so that the operator can handle more FlinkDeployments?
>
> Thanks,
> Tony
>
>
> On Fri, Nov 3, 2023 at 5:16 AM Nicolas Fraison <
> nicolas.frai...@datadoghq.com> wrote:
>
>> Hi,
>>
>> We have faced a similar issue with flink kubernetes operator, having
>> multiple operators running as leader at the same time.
>> On our side the issue was on the java-operator-sdk which was not well
>> killing operator that lost leadership (
>> https://github.com/operator-framework/java-operator-sdk/issues/2056).
>> The issue has been solved in java-operator-sdk 4.4.4 and version has been
>> bumped in flink kubernetes operator (
>> https://issues.apache.org/jira/browse/FLINK-33125/https://github.com/apache/flink-kubernetes-operator/pull/680
>> ).
>> But this patch will probably only provided on flink kubernetes operator
>> 1.17 so I would recommend not to rely on multiple operators or patch locally
>>
>> Nicolas
>>
>> On Fri, Nov 3, 2023 at 9:57 AM Evgeniy Lyutikov 
>> wrote:
>>
>>> Hello!
>>> I constantly get a similar error when operator (working in single
>>> instance) receiving deployment statuses
>>> Details described in this message
>>> https://lists.apache.org/thread/0odcc9pvlpz1x9y2nop9dlmcnp9v1696
>>> I tried changing versions and allocated resources, as well as the number
>>> of reconcile threads, but nothing helped
>>>
>>> --
>>> *От:* Tony Chen 
>>> *Отправлено:* 3 ноября 2023 г. 9:13:51
>>> *Кому:* user@flink.apache.org
>>> *Копия:* Nathan Moderwell
>>> *Тема:* Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+
>>> FlinkDeployments concurrently
>>>
>>> One of the operator pods logged the following exception before the
>>> container restarted:
>>>
>>> �[m�[33m2023-11-01 14:24:21,260�[m
>>> �[36mo.a.f.s.n.i.n.c.AbstractChannel�[m �[33m[WARN ] Force-closing a
>>> channel whose registration task was not accepted by an event loop: [id:
>>> 0x1a7718c1]
>>> java.util.concurrent.RejectedExecutionException: event executor
>>> terminated
>>>
>>> I did notice that all of our 3 operator pods were reconciling
>>> FlinkDeployments, and this definitely is an issue. After I churned 2 of the
>>> pods, there was only 1 pod that was the leader, and this operator pod was
>>> able to reconcile SPECCHANGES of FlinkDeployments again.
>>>
>>> Are there any recommendations on how I can enforce only 1 pod to be the
>>> leader? For example, would increasing the lease-duration help?
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>>> <https://eur04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-kubernetes-operator-docs-main%2Fdocs%2Foperations%2Fconfiguration%2F=05%7C01%7Ceblyutikov%40avito.ru%7C9be0eed45d3643af4c3d08dbdc12b1cd%7Caf0e07b3b90b472392e63fab11dd5396%7C0%7C0%7C638345745082130170%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=Rrhw00SUQD3sOYWi17gjx8M9gGHe8JgrkPY9%2FswNVXk%3D=0>
>>>
>>> On Wed, Nov 1, 2023 at 11:16 PM Tony Chen 
>>> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> I am currently running flink-kubernetes-operator 1.6-patched (
>>>> https://github.com/apache/flink-kubernetes-operator/commit/3f0dc2ee5534084bc162e6deaded36e93bb5e384
>>>> <https://eur04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-kubernetes-operator%2Fcommit%2F3f0dc2ee5534084bc162e6deaded36e93bb5e384=05%7C01%7Ceblyutikov%40avito.ru%7C9be0eed45d3643af4c3d08dbdc12b1cd%7Caf0e07b3b90b472392

Increasing maximum number of FlinkDeployments that the Operator can handle

2023-11-08 Thread Tony Chen
Hi Flink Community,

This is a follow-up on a previous email thread (see email thread below).
After changing the number of operator pods to 1, although we didn't
encounter the multiple leaders issue anymore, our singleton operator pod
restarts whenever we have 150+ FlinkDeployments. Sometimes, the operator
pod would be stuck CrashLoopBackOff.

We changed the parameter kubernetes.operator.reconcile.parallelism to -1,
but this didn't help. Are there other parameters (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.6/docs/operations/configuration/#system-configuration)
we can tune so that the operator can handle more FlinkDeployments?

Thanks,
Tony


On Fri, Nov 3, 2023 at 5:16 AM Nicolas Fraison <
nicolas.frai...@datadoghq.com> wrote:

> Hi,
>
> We have faced a similar issue with flink kubernetes operator, having
> multiple operators running as leader at the same time.
> On our side the issue was on the java-operator-sdk which was not well
> killing operator that lost leadership (
> https://github.com/operator-framework/java-operator-sdk/issues/2056).
> The issue has been solved in java-operator-sdk 4.4.4 and version has been
> bumped in flink kubernetes operator (
> https://issues.apache.org/jira/browse/FLINK-33125/https://github.com/apache/flink-kubernetes-operator/pull/680
> ).
> But this patch will probably only provided on flink kubernetes operator
> 1.17 so I would recommend not to rely on multiple operators or patch locally
>
> Nicolas
>
> On Fri, Nov 3, 2023 at 9:57 AM Evgeniy Lyutikov 
> wrote:
>
>> Hello!
>> I constantly get a similar error when operator (working in single
>> instance) receiving deployment statuses
>> Details described in this message
>> https://lists.apache.org/thread/0odcc9pvlpz1x9y2nop9dlmcnp9v1696
>> I tried changing versions and allocated resources, as well as the number
>> of reconcile threads, but nothing helped
>>
>> --
>> *От:* Tony Chen 
>> *Отправлено:* 3 ноября 2023 г. 9:13:51
>> *Кому:* user@flink.apache.org
>> *Копия:* Nathan Moderwell
>> *Тема:* Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+
>> FlinkDeployments concurrently
>>
>> One of the operator pods logged the following exception before the
>> container restarted:
>>
>> �[m�[33m2023-11-01 14:24:21,260�[m
>> �[36mo.a.f.s.n.i.n.c.AbstractChannel�[m �[33m[WARN ] Force-closing a
>> channel whose registration task was not accepted by an event loop: [id:
>> 0x1a7718c1]
>> java.util.concurrent.RejectedExecutionException: event executor terminated
>>
>> I did notice that all of our 3 operator pods were reconciling
>> FlinkDeployments, and this definitely is an issue. After I churned 2 of the
>> pods, there was only 1 pod that was the leader, and this operator pod was
>> able to reconcile SPECCHANGES of FlinkDeployments again.
>>
>> Are there any recommendations on how I can enforce only 1 pod to be the
>> leader? For example, would increasing the lease-duration help?
>>
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/
>> <https://eur04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-kubernetes-operator-docs-main%2Fdocs%2Foperations%2Fconfiguration%2F=05%7C01%7Ceblyutikov%40avito.ru%7C9be0eed45d3643af4c3d08dbdc12b1cd%7Caf0e07b3b90b472392e63fab11dd5396%7C0%7C0%7C638345745082130170%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=Rrhw00SUQD3sOYWi17gjx8M9gGHe8JgrkPY9%2FswNVXk%3D=0>
>>
>> On Wed, Nov 1, 2023 at 11:16 PM Tony Chen 
>> wrote:
>>
>>> Hi Flink Community,
>>>
>>> I am currently running flink-kubernetes-operator 1.6-patched (
>>> https://github.com/apache/flink-kubernetes-operator/commit/3f0dc2ee5534084bc162e6deaded36e93bb5e384
>>> <https://eur04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-kubernetes-operator%2Fcommit%2F3f0dc2ee5534084bc162e6deaded36e93bb5e384=05%7C01%7Ceblyutikov%40avito.ru%7C9be0eed45d3643af4c3d08dbdc12b1cd%7Caf0e07b3b90b472392e63fab11dd5396%7C0%7C0%7C638345745082130170%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=OEDCPJVv6wgz6U3PtBVjEVEDQfovjIPAaTxXKJ1GMBU%3D=0>),
>>> and I have 3 flink-kubernetes-operator pods running. Recently, I deployed
>>> around 110 new FlinkDeployments, and I had no issues with this initial
>>> deployment. However, when I applied changes to all of these 110 new
>>> FlinkDeployments concurrently to update their container image, the
>>> flink-ku

Re: 配置了state.checkpoints.num-retained为1,但taskmanger 中checkpoints数量越来越多,占用内存,如何解决?

2023-11-07 Thread Yu Chen
Hi 嘉贤,

这不太符合预期。请问任务中间有发生手动Cancel的情况吗?这种情况下,Flink的默认行为是RETAINED_ON_CANCELLATION,需要手动清理。
如果你希望在任务CANCEL之后将Checkpoint清理,可以考虑调整参数execution-checkpointing-externalized-checkpoint-retention[1].


[1] 
http://stream.devops.sit.xiaohongshu.com/docs/red/docs/deployment/config/#execution-checkpointing-externalized-checkpoint-retention

Best,
Yu Chen


> 2023年11月8日 13:08,梁嘉贤  写道:
> 
> Hi, 我纠正一下我的问题,是taskmanager中checkpoints数量越来越多占用磁盘。同时,补充一下以下信息:
> 我通过把task manager的checkpoint路径挂载到本地,采用du 
> -h命令查看checkpoint中的结果,发现任务中会持续增加chk,导致占用磁盘越来越大,如下图
> 我的疑问是,如何把这些历史chk文件删掉?
> <2dfdf...@88d32a26.b2174b65.png>
> 
> 
> 
> 
> 
> 梁嘉贤
> 深圳市城市交通规划设计研究中心有限公司/数据模型中心
>  
>  
>  
> -- Original --
> From:  "Yu Chen";
> Date:  Wed, Nov 8, 2023 12:59 PM
> To:  "梁嘉贤";
> Cc:  "user";
> Subject:  Re: 配置了state.checkpoints.num-retained为1,但taskmanger 
> 中checkpoints数量越来越多,占用内存,如何解决?
>  
> Hi 嘉贤,
> 
> Flink Web上展示的Checkpoint的历史记录,state-checkpoints.num-retained参数会控制在Checkpoint 
> storage中存储的checkpoint数量,Flink会滚动删除Checkpoint 
> storage的checkpoint文件,但是这个过程中Flink 
> Web上记录是不会删除的(你可以在对应的Checkpoint记录的Path上的地址去确认)。
> 同时,如果你是Heap 
> StateBackend,那么状态是存储到内存里的,checkpoint是flush到文件的。之所以内存增大大概率是任务本身导致的,而非历史Checkpoint导致(例如全局窗口聚合且未设置State
>  TTL的场景),如果要定位内存上涨的原因还需要更多的作业信息。
> 另外,如果你希望确认参数是否生效,可以在JobManager的Configuration一栏查看。
> 
> Best,
> Yu Chen
> 
> > 2023年11月8日 11:56,梁嘉贤  写道:
> > 
> > 您好,
> >   采用Flink 1.14 
> > 版本,用docker分别建立了jobmanger和taskmanager两个容器,docker-compose.yml信息如下图1所示。
> >   在配置中,设置了state.checkpoints.num-retained : 
> > 1,但在web中看到checkpoint持续增多(下图2),在taskmanager容器中的checkpoint数量也持续增多,请问可以怎么清理这些历史checkpoint吗?
> >   
> > <03b4c...@04cd7323.eb064b65.png>
> 



Re: 配置了state.checkpoints.num-retained为1,但taskmanger 中checkpoints数量越来越多,占用内存,如何解决?

2023-11-07 Thread Yu Chen
Hi 嘉贤,

Flink Web上展示的Checkpoint的历史记录,state-checkpoints.num-retained参数会控制在Checkpoint 
storage中存储的checkpoint数量,Flink会滚动删除Checkpoint storage的checkpoint文件,但是这个过程中Flink 
Web上记录是不会删除的(你可以在对应的Checkpoint记录的Path上的地址去确认)。
同时,如果你是Heap 
StateBackend,那么状态是存储到内存里的,checkpoint是flush到文件的。之所以内存增大大概率是任务本身导致的,而非历史Checkpoint导致(例如全局窗口聚合且未设置State
 TTL的场景),如果要定位内存上涨的原因还需要更多的作业信息。
另外,如果你希望确认参数是否生效,可以在JobManager的Configuration一栏查看。

Best,
Yu Chen

> 2023年11月8日 11:56,梁嘉贤  写道:
> 
> 您好,
>   采用Flink 1.14 
> 版本,用docker分别建立了jobmanger和taskmanager两个容器,docker-compose.yml信息如下图1所示。
>   在配置中,设置了state.checkpoints.num-retained : 
> 1,但在web中看到checkpoint持续增多(下图2),在taskmanager容器中的checkpoint数量也持续增多,请问可以怎么清理这些历史checkpoint吗?
>   
> <03b4c...@04cd7323.eb064b65.png>



Re: Error in /jars/upload curl request

2023-11-07 Thread Yu Chen
Hi Tauseef,

That's really dependent on the environment you're actually running in. But I'm 
guessing you're using ingress to route your requests to the JM POD. 
If so, I'd suggest you adjust the value of 
nginx.ingress.kubernetes.io/proxy-body-size.

Following is an example.
```
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
   kubernetes.io/ingress.class: nginx
   nginx.ingress.kubernetes.io/proxy-body-size: 250m # change this
  name: xxx
  namespace: xxx
spec:
  rules:
  - host: flink-nyquist.hvreaning.com
http:
 paths:
 - backend:
serviceName: xxx
servicePort: 8081
```

Please let me know if there are any other problems.

Best,Yu Chen

> 2023年11月7日 18:40,Tauseef Janvekar  写道:
> 
> Hi Chen,
> 
> We are not using nginx anywhere on the server(kubernetes cluster) or on my 
> client(my local machine).
> Not sure how to proceed on this.
> 
> Thanks,
> Tauseef
> 
> On Tue, 7 Nov 2023 at 13:36, Yu Chen  wrote:
> Hi Tauseef,
> 
> The error was caused by the nginx configuration and was not a flink problem.
> 
> You can find many related solutions on the web [1].
> 
> Best,
> Yu Chen
> 
> [1] 
> https://stackoverflow.com/questions/24306335/413-request-entity-too-large-file-upload-issue
> 
>> 2023年11月7日 15:14,Tauseef Janvekar  写道:
>> 
>> Hi Chen,
>> 
>> Now I get a different error message.
>> root@R914SK4W:~/learn-building-flink-applications-in-java-exercises/exercises#
>>  curl -X POST -H "Expect:" -F "jarfile=@./target/travel-i
>> tinerary-0.1.jar" https://flink-nyquist.hvreaning.com/jars/upload
>> 
>> 413 Request Entity Too Large
>> 
>> 413 Request Entity Too Large
>> nginx
>> 
>> 
>> 
>> Thanks
>> Tauseef
>> 
>> On Tue, 7 Nov 2023 at 06:19, Chen Yu  wrote:
>>  Hi Tauseef,
>> 
>> Adding an @ sign before the path will resolve your problem. 
>> And I verified that both web and postman upload the jar file properly on the 
>> master branch code. 
>> If you are still having problems then you can provide some more detailed 
>> information.
>> 
>> Here are some documents of curl by `man curl`.
>> 
>>-F, --form  
>>   (HTTP SMTP IMAP) For HTTP protocol family, this lets curl 
>> emulate a filled-in form in which a user  has
>>   pressed the submit button. This causes curl to POST data using 
>> the Content-Type multipart/form-data ac‐
>>   cording to RFC 2388.
>> 
>>   For SMTP and IMAP protocols, this is the means to compose a 
>> multipart mail message to transmit.
>> 
>>   This enables uploading of binary files etc. To force the 
>> 'content' part to be a file, prefix  the  file
>>   name  with an @ sign. To just get the content part from a 
>> file, prefix the file name with the symbol <.
>>   The difference between @ and < is then that @ makes a file get 
>> attached in the post as a  file  upload,
>>   while the < makes a text field and just get the contents for 
>> that text field from a file.
>> 
>> 
>> Best,
>> Yu Chen
>> 发件人: Tauseef Janvekar 
>> 发送时间: 2023年11月6日 22:27
>> 收件人: user@flink.apache.org 
>> 主题: Error in /jars/upload curl request   I am using curl request to upload a 
>> jar but it throws the below error 
>> 
>> 
>> Received unknown attribute jarfile.
>> 
>> Not sure what is wrong here. I am following the standard documentation
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>> 
>> Please let me know if I have to use some other command to upload a jar using 
>> "/jars/upload" endpoint
>> 
>> I also tried to upload using webui but it hangs continuously and only calls 
>> GET api with 200 success- https://flink-nyquist.hvreaning.com/jars
>> 
>> Thanks,
>> Tauseef
> 



Re: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-07 Thread Yu Chen
Hi Arjun,

As stated in the document, 'This regex pattern should be matched with the 
absolute file path.'
Therefore, you should adjust your regular expression to match absolute paths.

Please let me know if there are any other problems.

Best,
Yu Chen

> 2023年11月7日 18:11,arjun s  写道:
> 
> Hi Chen,
> I attempted to configure the 'source.path.regex-pattern' property in the 
> table settings as '^customer.*' to ensure that the Flink job only processes 
> file names starting with "customer" in the specified directory. However, it 
> appears that this configuration is not producing the expected results. Are 
> there any additional configurations or adjustments that need to be made? The 
> table script I used is as follows:
> CREATE TABLE sample (
>   col1 STRING,
>   col2 STRING,
>   col3 STRING,
>   col4 STRING,
>   file.path STRING NOT NULL METADATA
> ) WITH (
>   'connector' = 'filesystem',
>   'path' = 'file:///home/techuser/inputdata',
>   'format' = 'csv',
>   'source.path.regex-pattern' = '^customer.*',
>   'source.monitor-interval' = '1'
> )
> Thanks in advance,
> Arjun
> 
> On Mon, 6 Nov 2023 at 20:56, Chen Yu  wrote:
> Hi Arjun,
> 
> If you can filter files by a regex pattern, I think the config 
> `source.path.regex-pattern`[1] maybe what you want.
> 
>   'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
> files to read under the 
> -- directory of `path` option. This 
> regex pattern should be
> -- matched with the absolute file 
> path. If this option is set,
> -- the connector  will recursive all 
> files under the directory
> -- of `path` option
> 
> Best,
> Yu Chen
> 
> 
> [1] 
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/
> 
> 发件人: arjun s 
> 发送时间: 2023年11月6日 20:50
> 收件人: user@flink.apache.org 
> 主题: Handling Schema Variability and Applying Regex Patterns in Flink Job 
> Configuration   Hi team,
> I'm currently utilizing the Table API function within my Flink job, with the 
> objective of reading records from CSV files located in a source directory. To 
> obtain the file names, I'm creating a table and specifying the schema using 
> the Table API in Flink. Consequently, when the schema matches, my Flink job 
> successfully submits and executes as intended. However, in cases where the 
> schema does not match, the job fails to submit. Given that the schema of the 
> files in the source directory is unpredictable, I'm seeking a method to 
> handle this situation.
> Create table query
> =
> CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4 
> STRING,file.path` STRING NOT NULL METADATA) WITH ('connector' = 
> 'filesystem','path' = 'file:///home/techuser/inputdata','format' = 
> 'csv','source.monitor-interval' = '1')
> =
> 
> Furthermore, I have a question about whether there's a way to read files from 
> the source directory based on a specific regex pattern. This is relevant in 
> our situation because only file names that match a particular pattern need to 
> be processed by the Flink job.
> 
> Thanks and Regards,
> Arjun



Re: Error in /jars/upload curl request

2023-11-07 Thread Yu Chen
Hi Tauseef,

The error was caused by the nginx configuration and was not a flink problem.

You can find many related solutions on the web [1].

Best,
Yu Chen

[1] 
https://stackoverflow.com/questions/24306335/413-request-entity-too-large-file-upload-issue

> 2023年11月7日 15:14,Tauseef Janvekar  写道:
> 
> Hi Chen,
> 
> Now I get a different error message.
> root@R914SK4W:~/learn-building-flink-applications-in-java-exercises/exercises#
>  curl -X POST -H "Expect:" -F "jarfile=@./target/travel-i
> tinerary-0.1.jar" https://flink-nyquist.hvreaning.com/jars/upload
> 
> 413 Request Entity Too Large
> 
> 413 Request Entity Too Large
> nginx
> 
> 
> 
> Thanks
> Tauseef
> 
> On Tue, 7 Nov 2023 at 06:19, Chen Yu  <mailto:yuchen.e...@gmail.com>> wrote:
>>  Hi Tauseef,
>> 
>> Adding an @ sign before the path will resolve your problem. 
>> And I verified that both web and postman upload the jar file properly on the 
>> master branch code. 
>> If you are still having problems then you can provide some more detailed 
>> information.
>> 
>> Here are some documents of curl by `man curl`.
>> 
>>-F, --form 
>>   (HTTP SMTP IMAP) For HTTP protocol family, this lets curl 
>> emulate a filled-in form in which a user  has
>>   pressed the submit button. This causes curl to POST data using 
>> the Content-Type multipart/form-data ac‐
>>   cording to RFC 2388.
>> 
>>   For SMTP and IMAP protocols, this is the means to compose a 
>> multipart mail message to transmit.
>> 
>>   This enables uploading of binary files etc. To force the 
>> 'content' part to be a file, prefix  the  file
>>   name  with an @ sign. To just get the content part from a 
>> file, prefix the file name with the symbol <.
>>   The difference between @ and < is then that @ makes a file get 
>> attached in the post as a  file  upload,
>>   while the < makes a text field and just get the contents for 
>> that text field from a file.
>> 
>> 
>> Best,
>> Yu Chen
>> 发件人: Tauseef Janvekar > <mailto:tauseefjanve...@gmail.com>>
>> 发送时间: 2023年11月6日 22:27
>> 收件人: user@flink.apache.org <mailto:user@flink.apache.org> 
>> mailto:user@flink.apache.org>>
>> 主题: Error in /jars/upload curl request
>>  
>> I am using curl request to upload a jar but it throws the below error
>> 
>> 
>> Received unknown attribute jarfile.
>> 
>> Not sure what is wrong here. I am following the standard documentation
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/
>> 
>> Please let me know if I have to use some other command to upload a jar using 
>> "/jars/upload" endpoint
>> 
>> I also tried to upload using webui but it hangs continuously and only calls 
>> GET api with 200 success- https://flink-nyquist.hvreaning.com/jars
>> 
>> Thanks,
>> Tauseef



回复: Error in /jars/upload curl request

2023-11-06 Thread Chen Yu
 Hi Tauseef,

Adding an @ sign before the path will resolve your problem.
And I verified that both web and postman upload the jar file properly on the 
master branch code.
If you are still having problems then you can provide some more detailed 
information.

Here are some documents of curl by `man curl`.

   -F, --form 
  (HTTP SMTP IMAP) For HTTP protocol family, this lets curl emulate 
a filled-in form in which a user  has
  pressed the submit button. This causes curl to POST data using 
the Content-Type multipart/form-data ac�\
  cording to RFC 2388.

  For SMTP and IMAP protocols, this is the means to compose a 
multipart mail message to transmit.

  This enables uploading of binary files etc. To force the 
'content' part to be a file, prefix  the  file
  name  with an @ sign. To just get the content part from a file, 
prefix the file name with the symbol <.
  The difference between @ and < is then that @ makes a file get 
attached in the post as a  file  upload,
  while the < makes a text field and just get the contents for that 
text field from a file.


Best,
Yu Chen

发件人: Tauseef Janvekar 
发送时间: 2023年11月6日 22:27
收件人: user@flink.apache.org 
主题: Error in /jars/upload curl request

I am using curl request to upload a jar but it throws the below error

[image.png]
Received unknown attribute jarfile.

Not sure what is wrong here. I am following the standard documentation
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/

Please let me know if I have to use some other command to upload a jar using 
"/jars/upload" endpoint

I also tried to upload using webui but it hangs continuously and only calls GET 
api with 200 success- https://flink-nyquist.hvreaning.com/jars

Thanks,
Tauseef


回复: Handling Schema Variability and Applying Regex Patterns in Flink Job Configuration

2023-11-06 Thread Chen Yu
Hi Arjun,

If you can filter files by a regex pattern, I think the config 
`source.path.regex-pattern`[1] maybe what you want.


  'source.path.regex-pattern' = '...',  -- optional: regex pattern to filter 
files to read under the
-- directory of `path` option. This 
regex pattern should be
-- matched with the absolute file path. 
If this option is set,
-- the connector  will recursive all 
files under the directory
-- of `path` option

Best,
Yu Chen


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/filesystem/


发件人: arjun s 
发送时间: 2023年11月6日 20:50
收件人: user@flink.apache.org 
主题: Handling Schema Variability and Applying Regex Patterns in Flink Job 
Configuration

Hi team,
I'm currently utilizing the Table API function within my Flink job, with the 
objective of reading records from CSV files located in a source directory. To 
obtain the file names, I'm creating a table and specifying the schema using the 
Table API in Flink. Consequently, when the schema matches, my Flink job 
successfully submits and executes as intended. However, in cases where the 
schema does not match, the job fails to submit. Given that the schema of the 
files in the source directory is unpredictable, I'm seeking a method to handle 
this situation.
Create table query
=
CREATE TABLE sample (col1 STRING,col2 STRING,col3 STRING,col4 STRING,file.path` 
STRING NOT NULL METADATA) WITH ('connector' = 'filesystem','path' = 
'file:///home/techuser/inputdata','format' = 'csv','source.monitor-interval' = 
'1')
=

Furthermore, I have a question about whether there's a way to read files from 
the source directory based on a specific regex pattern. This is relevant in our 
situation because only file names that match a particular pattern need to be 
processed by the Flink job.

Thanks and Regards,
Arjun


回复: Auditing sink using table api

2023-11-04 Thread Chen Yu
Hi Bo,

How about write the data to Print Connector[1] simultaneously via 
insertInto[2]? It will print the data into Taskmanager's Log.
Of course, you can choose an appropriate connector according to your audit log 
storage.

Best,
Yu Chen

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/print/
[2]https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/common/#emit-a-table


发件人: Bo <99...@qq.com>
发送时间: 2023年11月4日 13:53
收件人: user 
主题: Auditing sink using table api

Hello community,

I am looking for a way to perform auditing of the various sinks (mostly 
JdbcDynamicTableSink) using the table api.
By "auditing", I mean to log details of every row data coming into the sink, 
and any anormalies when the sink write to external systems.

Does flink have some kind of auditing mechanism in place? The only way I could 
see now is to make a custom sink that supports detail logging to external 
systems.

Any thoughts/suggestions?

Regards,

Bo


Re: Inquiry about ActiveResourceManager and StandaloneResourceManager in Flink

2023-11-03 Thread Yu Chen
Hi Steven,

As stated in the `StandaloneResourceManager` comments, the manager does not
acquire new resources and the user needs to manually start the Taskmanager
by themself.
While `ActiveResourceManager` achieves requesting or releasing resources on
demand(that's what active means) based on some resource frameworks ( like
yarn and k8s ).

As we know, different users have different environments in production, and
not all of them want to run in yarn or k8s (especially for local debugging,
Standalone Cluster is very convenient).
Therefore, Flink provides users with these two different resource managers
to deal with different usage scenarios.

Please feel free to correct me if there are any misunderstandings.

Best regards,
Yu Chen

Steven Chen  于2023年11月3日周五 13:28写道:

> Dear Flink Community,
>
>
> I am currently using Flink for my project and have a question regarding
> ActiveResourceManager and StandaloneResourceManager.
>
> What does "active" mean in ActiveResourceManager and why is
> StandaloneResourceManager not considered an active resource manager?
>
>
> Thank you for your time and assistance.
>
>
> Best regards,
> Steven Chen
>


Inquiry about ActiveResourceManager and StandaloneResourceManager in Flink

2023-11-02 Thread Steven Chen
Dear Flink Community,




I am currently using Flink for my project and have a question regarding 
ActiveResourceManager and StandaloneResourceManager.

What does "active" mean in ActiveResourceManager and why is 
StandaloneResourceManager not considered an active resource manager?




Thank you for your time and assistance.




Best regards,
Steven Chen

Re: flink-kubernetes-operator cannot handle SPECCHANGE for 100+ FlinkDeployments concurrently

2023-11-02 Thread Tony Chen
One of the operator pods logged the following exception before the
container restarted:

�[m�[33m2023-11-01 14:24:21,260�[m �[36mo.a.f.s.n.i.n.c.AbstractChannel�[m
�[33m[WARN ] Force-closing a channel whose registration task was not
accepted by an event loop: [id: 0x1a7718c1]
java.util.concurrent.RejectedExecutionException: event executor terminated

I did notice that all of our 3 operator pods were reconciling
FlinkDeployments, and this definitely is an issue. After I churned 2 of the
pods, there was only 1 pod that was the leader, and this operator pod was
able to reconcile SPECCHANGES of FlinkDeployments again.

Are there any recommendations on how I can enforce only 1 pod to be the
leader? For example, would increasing the lease-duration help?

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/configuration/

On Wed, Nov 1, 2023 at 11:16 PM Tony Chen  wrote:

> Hi Flink Community,
>
> I am currently running flink-kubernetes-operator 1.6-patched (
> https://github.com/apache/flink-kubernetes-operator/commit/3f0dc2ee5534084bc162e6deaded36e93bb5e384),
> and I have 3 flink-kubernetes-operator pods running. Recently, I deployed
> around 110 new FlinkDeployments, and I had no issues with this initial
> deployment. However, when I applied changes to all of these 110 new
> FlinkDeployments concurrently to update their container image, the
> flink-kubernetes-operator pods seemed to be in conflict with each other
> constantly.
>
> For example, before the SPECCHANGE, FlinkDeployment rh-flinkdeployment-01
> would be RUNNING (status.jobStatus.state) and STABLE
> (status.lifecycleState). After the FlinkDeployment spec is updated,
> rh-flinkdeployment-01 goes through FINISHED (status.jobStatus.state) and
> UPGRADING (status.jobStatus.state), and then RECONCILING
> (status.jobStatus.state) and DEPLOYED (status.jobStatus.state). It reaches
> RUNNING and STABLE again, but then for some reason it goes back to FINISHED
> and UPGRADING again, and I do notice that the newly created jobmanager pod
> gets deleted and then recreated. rh-flinkdeployment-01 basically becomes
> stuck in this loop where it becomes stable and then gets re-deployed by the
> operator.
>
> This doesn't happen to all 110 FlinkDeployments, but it happens to around
> 30 of them concurrently.
>
> I have pasted some logs from one of the operator pods on one of the
> FlinkDeployments. I have also highlighted messages that seem suspicious to
> me. I will try to gather more logs and send them tomorrow.
>
> For now, to mitigate this, I had to delete all of these FlinkDeployments
> and run them with the deprecated GoogleCloudPlatform operator. I'm hoping
> to resolve this soon so that I don't have to run anything on the
> GoogleCloudPlatform operator anymore.
>
> Thanks!
> Tony
>
>
> �[m�[33m2023-11-02 05:26:02,132�[m
> �[36mi.j.o.p.e.ReconciliationDispatcher�[m
> �[1;31m[ERROR][/] Error during event processing
> ExecutionScope{ resource id: ResourceID{name=' namespace=''}, version: 17772349729} failed.
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> org.apache.flink.kubernetes.operator.exception.StatusConflictException:
> Status have been modified externally in version 17772349851 Previous:
> 
> ...
> 2023-11-02 05:27:25,945 o.a.f.k.o.o.d.ApplicationObserver [WARN
> ][/] *Running deployment generation -1
> doesn't match upgrade target generation 2.*
> 2023-11-02 05:27:25,946 o.a.f.c.Configuration  [WARN
> ][/] Config uses deprecated configuration key
> 'high-availability' instead of proper key 'high-availability.type'
> 2023-11-02 05:27:26,034 o.a.f.k.o.l.AuditUtils [INFO
> ][/] >>> Status | Info| UPGRADING   |
> The resource is being upgraded
> 2023-11-02 05:27:26,057 o.a.f.k.o.l.AuditUtils [INFO
> ][/] >>> Event  | Info| SUBMIT  |
> Starting deployment
> 2023-11-02 05:27:26,057 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][/] Deploying application cluster requiring
> last-state from HA metadata
> 2023-11-02 05:27:26,057 o.a.f.c.Configuration  [WARN
> ][/] Config uses deprecated configuration key
> 'high-availability' instead of proper key 'high-availability.type'
> 2023-11-02 05:27:26,084 o.a.f.c.Configuration  [WARN
> ][/] Config uses deprecated configuration key
> 'high-availability' instead of proper key 'high-availability.type'
> 2023-11-02 05:27:26,110 o.a.f.k.o.s.NativeFlinkService [INFO
> ][/] Deploying application cluster
> 2023-11-02 05:27:26,110 o.a.f.c.d.a.c.ApplicationClusterDeployer [INFO
> ][/] Submitting application in 'Application
> Mode'.
> 2023-11-02 05:27:26,112 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
> ][/] The derived from fraction jvm overhead
> memory (1.000gb (1073741840

Fwd: flink-kubernetes-operator cannot handle SPECCHANGE for 100+ FlinkDeployments concurrently

2023-11-02 Thread Tony Chen
Hi Flink Community,

I am currently running flink-kubernetes-operator 1.6-patched (
https://github.com/apache/flink-kubernetes-operator/commit/3f0dc2ee5534084bc162e6deaded36e93bb5e384),
and I have 3 flink-kubernetes-operator pods running. Recently, I deployed
around 110 new FlinkDeployments, and I had no issues with this initial
deployment. However, when I applied changes to all of these 110 new
FlinkDeployments concurrently to update their container image, the
flink-kubernetes-operator pods seemed to be in conflict with each other
constantly.

For example, before the SPECCHANGE, FlinkDeployment rh-flinkdeployment-01
would be RUNNING (status.jobStatus.state) and STABLE
(status.lifecycleState). After the FlinkDeployment spec is updated,
rh-flinkdeployment-01 goes through FINISHED (status.jobStatus.state) and
UPGRADING (status.jobStatus.state), and then RECONCILING
(status.jobStatus.state) and DEPLOYED (status.jobStatus.state). It reaches
RUNNING and STABLE again, but then for some reason it goes back to FINISHED
and UPGRADING again, and I do notice that the newly created jobmanager pod
gets deleted and then recreated. rh-flinkdeployment-01 basically becomes
stuck in this loop where it becomes stable and then gets re-deployed by the
operator.

This doesn't happen to all 110 FlinkDeployments, but it happens to around
30 of them concurrently.

I have pasted some logs from one of the operator pods on one of the
FlinkDeployments. I have also highlighted messages that seem suspicious to
me. I will try to gather more logs and send them tomorrow.

For now, to mitigate this, I had to delete all of these FlinkDeployments
and run them with the deprecated GoogleCloudPlatform operator. I'm hoping
to resolve this soon so that I don't have to run anything on the
GoogleCloudPlatform operator anymore.

Thanks!
Tony


�[m�[33m2023-11-02 05:26:02,132�[m
�[36mi.j.o.p.e.ReconciliationDispatcher�[m
�[1;31m[ERROR][/] Error during event processing
ExecutionScope{ resource id: ResourceID{name='
...
2023-11-02 05:27:25,945 o.a.f.k.o.o.d.ApplicationObserver [WARN
][/] *Running deployment generation -1 doesn't
match upgrade target generation 2.*
2023-11-02 05:27:25,946 o.a.f.c.Configuration  [WARN
][/] Config uses deprecated configuration key
'high-availability' instead of proper key 'high-availability.type'
2023-11-02 05:27:26,034 o.a.f.k.o.l.AuditUtils [INFO
][/] >>> Status | Info| UPGRADING   |
The resource is being upgraded
2023-11-02 05:27:26,057 o.a.f.k.o.l.AuditUtils [INFO
][/] >>> Event  | Info| SUBMIT  |
Starting deployment
2023-11-02 05:27:26,057 o.a.f.k.o.s.AbstractFlinkService [INFO
][/] Deploying application cluster requiring
last-state from HA metadata
2023-11-02 05:27:26,057 o.a.f.c.Configuration  [WARN
][/] Config uses deprecated configuration key
'high-availability' instead of proper key 'high-availability.type'
2023-11-02 05:27:26,084 o.a.f.c.Configuration  [WARN
][/] Config uses deprecated configuration key
'high-availability' instead of proper key 'high-availability.type'
2023-11-02 05:27:26,110 o.a.f.k.o.s.NativeFlinkService [INFO
][/] Deploying application cluster
2023-11-02 05:27:26,110 o.a.f.c.d.a.c.ApplicationClusterDeployer [INFO
][/] Submitting application in 'Application
Mode'.
2023-11-02 05:27:26,112 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
][/] The derived from fraction jvm overhead
memory (1.000gb (1073741840 bytes)) is greater than its max value
1024.000mb (1073741824 bytes), max value will be used instead
2023-11-02 05:27:26,112 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO
][/] The derived from fraction jvm overhead
memory (1.000gb (1073741840 bytes)) is greater than its max value
1024.000mb (1073741824 bytes), max value will be used instead
2023-11-02 05:27:26,163 o.a.f.k.o.s.AbstractFlinkService [INFO
][/] Waiting for cluster shutdown... (30s)
2023-11-02 05:27:26,193 o.a.f.k.o.l.AuditUtils [INFO
][/] >>> Event  | Warning |
*CLUSTERDEPLOYMENTEXCEPTION
| The Flink cluster  already exists.*
2023-11-02 05:27:26,193 o.a.f.k.o.r.ReconciliationUtils [WARN
][/] Attempt count: 0, last attempt: false
2023-11-02 05:27:26,277 o.a.f.k.o.l.AuditUtils [INFO
][/] *>>> Status | Error   | UPGRADING   |
{"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException:
The Flink cluster  already
exists.","additionalMetadata":{},"throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"The
Flink cluster  already exists.","additionalMetadata":{}}]}*


-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Re: [DISCUSS][FLINK-33240] Document deprecated options as well

2023-11-01 Thread Zhanghao Chen
Hi Samrat and Ruan,

Thanks for the suggestion. I'm actually in favor of adding the deprecated 
options in the same section as the non-deprecated ones. This would make user 
search for descriptions of the replacement options more easily. It would be a 
different story for options deprecated because the related API/module is 
entirely deprecated, e.g. DataSet API. In that case, users would not search for 
replacement on an individual option but rather need to migrate to a new API, 
and it would be better to move these options to a separate section. WDYT?

Best,
Zhanghao Chen

From: Samrat Deb 
Sent: Wednesday, November 1, 2023 15:31
To: d...@flink.apache.org 
Cc: user@flink.apache.org 
Subject: Re: [DISCUSS][FLINK-33240] Document deprecated options as well

Thanks for the proposal ,
+1 for adding deprecated identifier

[Thought] Can we have seperate section / page for deprecated configs ? Wdut
?


Bests,
Samrat


On Tue, 31 Oct 2023 at 3:44 PM, Alexander Fedulov <
alexander.fedu...@gmail.com> wrote:

> Hi Zhanghao,
>
> Thanks for the proposition.
> In general +1, this sounds like a good idea as long it is clear that the
> usage of these settings is discouraged.
> Just one minor concern - the configuration page is already very long, do
> you have a rough estimate of how many more options would be added with this
> change?
>
> Best,
> Alexander Fedulov
>
> On Mon, 30 Oct 2023 at 18:24, Matthias Pohl  .invalid>
> wrote:
>
> > Thanks for your proposal, Zhanghao Chen. I think it adds more
> transparency
> > to the configuration documentation.
> >
> > +1 from my side on the proposal
> >
> > On Wed, Oct 11, 2023 at 2:09 PM Zhanghao Chen  >
> > wrote:
> >
> > > Hi Flink users and developers,
> > >
> > > Currently, Flink won't generate doc for the deprecated options. This
> > might
> > > confuse users when upgrading from an older version of Flink: they have
> to
> > > either carefully read the release notes or check the source code for
> > > upgrade guidance on deprecated options.
> > >
> > > I propose to document deprecated options as well, with a "(deprecated)"
> > > tag placed at the beginning of the option description to highlight the
> > > deprecation status [1].
> > >
> > > Looking forward to your feedbacks on it.
> > >
> > > [1] https://issues.apache.org/jira/browse/FLINK-33240
> > >
> > > Best,
> > > Zhanghao Chen
> > >
> >
>


Re: [DISCUSS][FLINK-33240] Document deprecated options as well

2023-11-01 Thread Zhanghao Chen
Hi Alexander,

I haven't done a complete analysis yet. But through simple code search, roughly 
35 options would be added with this change. Also note that some old options 
defined in a ConfigConstant class won's be added here as flink-doc won't 
discover these constant-based options.

Best,
Zhanghao Chen

From: Alexander Fedulov 
Sent: Tuesday, October 31, 2023 18:12
To: d...@flink.apache.org 
Cc: user@flink.apache.org 
Subject: Re: [DISCUSS][FLINK-33240] Document deprecated options as well

Hi Zhanghao,

Thanks for the proposition.
In general +1, this sounds like a good idea as long it is clear that the usage 
of these settings is discouraged.
Just one minor concern - the configuration page is already very long, do you 
have a rough estimate of how many more options would be added with this change?

Best,
Alexander Fedulov

On Mon, 30 Oct 2023 at 18:24, Matthias Pohl  
wrote:
Thanks for your proposal, Zhanghao Chen. I think it adds more transparency
to the configuration documentation.

+1 from my side on the proposal

On Wed, Oct 11, 2023 at 2:09 PM Zhanghao Chen 
mailto:zhanghao.c...@outlook.com>>
wrote:

> Hi Flink users and developers,
>
> Currently, Flink won't generate doc for the deprecated options. This might
> confuse users when upgrading from an older version of Flink: they have to
> either carefully read the release notes or check the source code for
> upgrade guidance on deprecated options.
>
> I propose to document deprecated options as well, with a "(deprecated)"
> tag placed at the beginning of the option description to highlight the
> deprecation status [1].
>
> Looking forward to your feedbacks on it.
>
> [1] https://issues.apache.org/jira/browse/FLINK-33240
>
> Best,
> Zhanghao Chen
>


FLINK-24035, how can this issue be repeated?

2023-10-24 Thread rui chen
   We encountered similar problems in production, and we want to integrate
   FLINK-24035 to solve them, but we don't know how to repeat the problem.


FLINK-24035, how can this issue be repeated?

2023-10-24 Thread rui chen
   We encountered similar problems in production, and we want to integrate
   FLINK-24035 to solve them, but we don't know how to repeat the problem.


Re: Problems with the state.backend.fs.memory-threshold parameter

2023-10-23 Thread rui chen
Hi,Zakelly
Thank you for your answer.

Best,
rui


Zakelly Lan  于2023年10月13日周五 19:12写道:

> Hi rui,
>
> The 'state.backend.fs.memory-threshold' configures the threshold below
> which state is stored as part of the metadata, rather than in separate
> files. So as a result the JM will use its memory to merge small
> checkpoint files and write them into one file. Currently the
> FLIP-306[1][2] is proposed to merge small checkpoint files without
> consuming JM memory. This feature is currently being worked on and is
> targeted for the next minor release (1.19).
>
>
> Best,
> Zakelly
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> [2] https://issues.apache.org/jira/browse/FLINK-32070
>
> On Fri, Oct 13, 2023 at 6:28 PM rui chen  wrote:
> >
> > We found that for some tasks, the JM memory continued to increase. I set
> > the parameter of state.backend.fs.memory-threshold to 0, and the JM
> memory
> > would no longer increase, but many small files might be written in this
> > way. Does the community have any optimization plan for this area?
>


Re: Problems with the state.backend.fs.memory-threshold parameter

2023-10-23 Thread rui chen
Hi,Zakelly
Thank you for your answer.

Best,
rui


Zakelly Lan  于2023年10月13日周五 19:12写道:

> Hi rui,
>
> The 'state.backend.fs.memory-threshold' configures the threshold below
> which state is stored as part of the metadata, rather than in separate
> files. So as a result the JM will use its memory to merge small
> checkpoint files and write them into one file. Currently the
> FLIP-306[1][2] is proposed to merge small checkpoint files without
> consuming JM memory. This feature is currently being worked on and is
> targeted for the next minor release (1.19).
>
>
> Best,
> Zakelly
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints
> [2] https://issues.apache.org/jira/browse/FLINK-32070
>
> On Fri, Oct 13, 2023 at 6:28 PM rui chen  wrote:
> >
> > We found that for some tasks, the JM memory continued to increase. I set
> > the parameter of state.backend.fs.memory-threshold to 0, and the JM
> memory
> > would no longer increase, but many small files might be written in this
> > way. Does the community have any optimization plan for this area?
>


Re: Issue with flink-kubernetes-operator not updating execution.savepoint.path after savepoint deletion

2023-10-21 Thread Tony Chen
jobmanager.owner.reference: 
>>> metrics.reporter.prom.port: 9090
>>> taskmanager.memory.process.size: 10G
>>> kubernetes.internal.jobmanager.entrypoint.class:
>>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint
>>> pipeline.name: 
>>> execution.savepoint.path: s3:///savepoint-bad5e5-6ab08cf0808e
>>> kubernetes.pod-template-file:
>>> /tmp/flink_op_generated_podTemplate_12924532349572558288.yaml
>>> state.backend.rocksdb.localdir: /rocksdb/
>>> kubernetes.pod-template-file.taskmanager:
>>> /tmp/flink_op_generated_podTemplate_1129545383743356980.yaml
>>> web.cancel.enable: false
>>> execution.checkpointing.timeout: 5 min
>>> kubernetes.container.image.pull-policy: IfNotPresent
>>> $internal.pipeline.job-id: bad5e5682b8f4fbefbf75b00d285ac10
>>> kubernetes.jobmanager.cpu: 2.0
>>> state.backend: filesystem
>>> $internal.flink.version: v1_14
>>> kubernetes.pod-template-file.jobmanager:
>>> /tmp/flink_op_generated_podTemplate_824610597202468981.yaml
>>> blob.server.port: 6124
>>> kubernetes.jobmanager.annotations:
>>> flinkdeployment.flink.apache.org/generation:14
>>> metrics.scope.operator:
>>> flink.taskmanager.job..operator..metric
>>> state.savepoints.dir: s3:///savepoints
>>> kubernetes.taskmanager.cpu: 2.0
>>> execution.savepoint.ignore-unclaimed-state: true
>>> $internal.application.program-args:
>>> kubernetes.container.image: 
>>> taskmanager.numberOfTaskSlots: 1
>>> metrics.scope.jm.job: flink.jobmanager.job..metric
>>> kubernetes.rest-service.exposed.type: ClusterIP
>>> metrics.reporter.prom.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporter
>>> $internal.application.main: 
>>> metrics.scope.jm: flink.jobmanager.metric
>>> execution.target: kubernetes-application
>>> jobmanager.memory.process.size: 10G
>>> metrics.scope.tm.job: flink.taskmanager.job..metric
>>> taskmanager.rpc.port: 6122
>>> internal.cluster.execution-mode: NORMAL
>>> execution.checkpointing.externalized-checkpoint-retention:
>>> RETAIN_ON_CANCELLATION
>>> pipeline.jars: local:///build/flink/usrlib/.jar
>>> state.checkpoints.dir: s3:///checkpoints
>>>
>>> At the time of the issue, here is our FlinkDeployment Spec:
>>>
>>> Spec:
>>>   Flink Configuration:
>>> execution.checkpointing.timeout:  5 min
>>> kubernetes.operator.job.restart.failed:   true
>>> kubernetes.operator.periodic.savepoint.interval:  600s
>>> metrics.reporter.prom.class:
>>>  org.apache.flink.metrics.prometheus.PrometheusReporter
>>> metrics.reporter.prom.port:   9090
>>> metrics.reporters:prom
>>> metrics.scope.jm:
>>> flink.jobmanager.metric
>>> metrics.scope.jm.job:
>>> flink.jobmanager.job..metric
>>> metrics.scope.operator:
>>> flink.taskmanager.job..operator..metric
>>> metrics.scope.task:
>>> flink.taskmanager.job..task..metric
>>> metrics.scope.tm:
>>> flink.taskmanager.metric
>>> metrics.scope.tm.job:
>>> flink.taskmanager.job..metric
>>> pipeline.auto-generate-uids:  false
>>> pipeline.name:
>>> state.backend:filesystem
>>> state.backend.rocksdb.localdir:   /rocksdb/
>>> state.checkpoints.dir:
>>>  s3:///checkpoints
>>> state.savepoints.dir:
>>> s3:///savepoints
>>>   Flink Version:  v1_14
>>>   Image:  
>>>   Image Pull Policy:  IfNotPresent
>>>   Job:
>>> Allow Non Restored State:  true
>>> Args:
>>> Entry Class: 
>>> Initial Savepoint Path:
>>>  s3a:///savepoint-bad5e5-577c6a76aec5
>>> Jar URI: local:///build/flink/usrlib/.jar
>>> Parallelism: 2
>>> State:   running
>>> Upgrade Mode:savepoint
>>>
>>>
>>>
>>
>> --
>>
>> <http://www.robinhood.com/>
>>
>> Nathan Moderwell
>>
>> Senior Machine Learning Engineer
>>
>> Menlo Park, CA
>>
>> Don't copy, share, or use this email without permission. If you received
>> it by accident, please let us know and then delete it right away.
>>
>

-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Re: Flink Operator 1.6 causes JobManagerDeploymentStatus: MISSING

2023-10-18 Thread Tony Chen
I did see another email thread that mentions instructions on getting the
image from this link:
https://github.com/apache/flink-kubernetes-operator/pkgs/container/flink-kubernetes-operator/127962962?tag=3f0dc2e

On Wed, Oct 18, 2023 at 6:25 PM Tony Chen  wrote:

> We're using the Helm chart to deploy the operator right now, and the image
> that I'm using was downloaded from Docker Hub:
> https://hub.docker.com/r/apache/flink-kubernetes-operator/tags. I
> wouldn't be able to use the release-1.6 branch (
> https://github.com/apache/flink-kubernetes-operator/commits/release-1.6)
> to pick up the fix, unless I'm missing something.
>
> I was attempting to rollback the operator version to 1.4 today, and I ran
> into the following issues on some operator pods. I was wondering if you
> seen these Lease issues before.
>
> 2023-10-18 21:01:15,251 i.f.k.c.e.l.LeaderElector  [ERROR] Exception
> occurred while releasing lock 'LeaseLock: flink-kubernetes-operator -
> flink-operator-lease (flink-kubernetes-operator-74f9688dd-bcqr2)'
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
> Unable to update LeaseLock
> at
> io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:102)
> at
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.release(LeaderElector.java:139)
> at
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:120)
> at
> io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$start$2(LeaderElector.java:104)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
> Source)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)
> at
> java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
> Source)
> at io.fabric8.kubernetes.client.utils.Utils.lambda$null$12(Utils.java:523)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
> Source)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
> Source)
> at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
> at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
> at java.base/java.lang.Thread.run(Unknown Source)
> Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
> executing: PUT at:
> https://10.241.0.1/apis/coordination.k8s.io/v1/namespaces/flink-kubernetes-operator/leases/flink-operator-lease.
> Message: Operation cannot be fulfilled on leases.coordination.k8s.io 
> "flink-operator-lease":
> the object has been modified; please apply your changes to the latest
> version and try again. Received status: Status(apiVersion=v1, code=409,
> details=StatusDetails(causes=[], group=coordination.k8s.io, kind=leases,
> name=flink-operator-lease, retryAfterSeconds=null, uid=null,
> additionalProperties={}), kind=Status, message=Operation cannot be
> fulfilled on leases.coordination.k8s.io "flink-operator-lease": the
> object has been modified; please apply your changes to the latest version
> and try again, metadata=ListMeta(_continue=null, remainingItemCount=null,
> resourceVersion=null, selfLink=null, additionalProperties={}),
> reason=Conflict, status=Failure, additionalProperties={}).
>
> On Wed, Oct 18, 2023 at 2:55 PM Gyula Fóra  wrote:
>
>> Hi!
>> Not sure if it’s the same but could you try picking up the fix from the
>> release branch and confirming that it solves the problem?
>>
>> If it does we may consider a quick bug fix release.
>>
>> Cheers
>> Gyula
>>
>> On Wed, 18 Oct 2023 at 18:09, Tony Chen  wrote:
>>
>>> Hi Flink Community,
>>>
>>> Most of the Flink applications run on 1.14 at my company. After
>>> upgrading the Flink Operator to 1.6, we've seen many jobmanager pods show
>>> "JobManagerDeploymentStatus: MISSING".
>>>
>>> Here are some logs from the operator pod on one of our Flink
>>> applications:
>>>
>>> [m [33m2023-10-18 02:02:40,823 [m [36mo.a.f.k.o.l.AuditUtils [m
>>> [32m[INFO ][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
>>> SAVEPOINTERROR | Savepoint failed for savepointTriggerNonce: null
>>> ...
>>> [m [33m2023-10-18 02:02:40,883 [m [36mo.a.f.k.o.l.AuditUtils

Re: Flink Operator 1.6 causes JobManagerDeploymentStatus: MISSING

2023-10-18 Thread Tony Chen
We're using the Helm chart to deploy the operator right now, and the image
that I'm using was downloaded from Docker Hub:
https://hub.docker.com/r/apache/flink-kubernetes-operator/tags. I wouldn't
be able to use the release-1.6 branch (
https://github.com/apache/flink-kubernetes-operator/commits/release-1.6) to
pick up the fix, unless I'm missing something.

I was attempting to rollback the operator version to 1.4 today, and I ran
into the following issues on some operator pods. I was wondering if you
seen these Lease issues before.

2023-10-18 21:01:15,251 i.f.k.c.e.l.LeaderElector  [ERROR] Exception
occurred while releasing lock 'LeaseLock: flink-kubernetes-operator -
flink-operator-lease (flink-kubernetes-operator-74f9688dd-bcqr2)'
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
Unable to update LeaseLock
at
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock.update(LeaseLock.java:102)
at
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.release(LeaderElector.java:139)
at
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.stopLeading(LeaderElector.java:120)
at
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector.lambda$start$2(LeaderElector.java:104)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown
Source)
at io.fabric8.kubernetes.client.utils.Utils.lambda$null$12(Utils.java:523)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown
Source)
at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown
Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure
executing: PUT at:
https://10.241.0.1/apis/coordination.k8s.io/v1/namespaces/flink-kubernetes-operator/leases/flink-operator-lease.
Message: Operation cannot be fulfilled on leases.coordination.k8s.io
"flink-operator-lease":
the object has been modified; please apply your changes to the latest
version and try again. Received status: Status(apiVersion=v1, code=409,
details=StatusDetails(causes=[], group=coordination.k8s.io, kind=leases,
name=flink-operator-lease, retryAfterSeconds=null, uid=null,
additionalProperties={}), kind=Status, message=Operation cannot be
fulfilled on leases.coordination.k8s.io "flink-operator-lease": the object
has been modified; please apply your changes to the latest version and try
again, metadata=ListMeta(_continue=null, remainingItemCount=null,
resourceVersion=null, selfLink=null, additionalProperties={}),
reason=Conflict, status=Failure, additionalProperties={}).

On Wed, Oct 18, 2023 at 2:55 PM Gyula Fóra  wrote:

> Hi!
> Not sure if it’s the same but could you try picking up the fix from the
> release branch and confirming that it solves the problem?
>
> If it does we may consider a quick bug fix release.
>
> Cheers
> Gyula
>
> On Wed, 18 Oct 2023 at 18:09, Tony Chen  wrote:
>
>> Hi Flink Community,
>>
>> Most of the Flink applications run on 1.14 at my company. After upgrading
>> the Flink Operator to 1.6, we've seen many jobmanager pods show
>> "JobManagerDeploymentStatus: MISSING".
>>
>> Here are some logs from the operator pod on one of our Flink applications:
>>
>> [m [33m2023-10-18 02:02:40,823 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
>> SAVEPOINTERROR | Savepoint failed for savepointTriggerNonce: null
>> ...
>> [m [33m2023-10-18 02:02:40,883 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
>> CLUSTERDEPLOYMENTEXCEPTION | Status have been modified externally in
>> version 17447422864 Previous: 
>> ...
>> [m [33m2023-10-18 02:02:40,919 [m [36mi.j.o.p.e.ReconciliationDispatcher
>> [m [1;31m[ERROR][nemo/nemo-streaming-users-identi-updates] Error during
>> event processing ExecutionScope{ resource id:
>> ResourceID{name='nemo-streaming-users-identi-updates', namespace='nemo'},
>> version: 17447420285} failed.
>> ...
>> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
>> org.apache.flink.kubernetes.operator.ex

Re: Flink kubernets operator delete HA metadata after resuming from suspend

2023-10-18 Thread Tony Chen
tate : suspended ->
>>>>> running]), starting reconciliation.
>>>>> 2023-09-11 06:02:07,649 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource 
>>>>> is
>>>>> being upgraded
>>>>> 2023-09-11 06:02:07,649 o.a.f.k.o.r.d.ApplicationReconciler [INFO
>>>>> ][rec-job/rec-job] Deleting deployment with terminated application before
>>>>> new deployment
>>>>> 2023-09-11 06:02:07,649 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Deleting cluster with Foreground propagation
>>>>> 2023-09-11 06:02:07,649 o.a.f.k.o.s.NativeFlinkService [INFO
>>>>> ][rec-job/rec-job] Deleting JobManager deployment and HA metadata.
>>>>> 2023-09-11 06:02:07,691 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Waiting for cluster shutdown...
>>>>> 2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Cluster shutdown completed.
>>>>> 2023-09-11 06:02:07,763 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Deleting Kubernetes HA metadata
>>>>> 2023-09-11 06:02:07,820 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Waiting for cluster shutdown...
>>>>> 2023-09-11 06:02:07,831 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Cluster shutdown completed.
>>>>> 2023-09-11 06:02:07,975 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource 
>>>>> is
>>>>> being upgraded
>>>>> 2023-09-11 06:02:07,987 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Event  | Info| SUBMIT  | Starting
>>>>> deployment
>>>>> 2023-09-11 06:02:07,987 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Deploying application cluster requiring last-state from
>>>>> HA metadata
>>>>> 2023-09-11 06:02:07,999 o.a.f.k.o.c.FlinkDeploymentController
>>>>> [ERROR][rec-job/rec-job] Flink recovery failed
>>>>> 2023-09-11 06:02:08,012 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Event  | Warning | RESTOREFAILED   | HA metadata 
>>>>> not
>>>>> available to restore from last state. It is possible that the job has
>>>>> finished or terminally failed, or the configmaps have been deleted. Manual
>>>>> restore required.
>>>>> 2023-09-11 06:02:08,099 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Status | Error   | UPGRADING   |
>>>>> {"type":"org.apache.flink.kubernetes.operator.exception.RecoveryFailureException","message":"HA
>>>>> metadata not available to restore from last state. It is possible that the
>>>>> job has finished or terminally failed, or the configmaps have been 
>>>>> deleted.
>>>>> Manual restore required.","additionalMetadata":{},"throwableList":[]}
>>>>> 2023-09-11 06:02:08,193 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Status | Info| UPGRADING   | The resource 
>>>>> is
>>>>> being upgraded
>>>>> 2023-09-11 06:02:08,218 o.a.f.k.o.l.AuditUtils [INFO
>>>>> ][rec-job/rec-job] >>> Event  | Info| SUBMIT  | Starting
>>>>> deployment
>>>>> 2023-09-11 06:02:08,218 o.a.f.k.o.s.AbstractFlinkService [INFO
>>>>> ][rec-job/rec-job] Deploying application cluster requiring last-state from
>>>>> HA metadata
>>>>> 2023-09-11 06:02:08,228 o.a.f.k.o.c.FlinkDeploymentController
>>>>> [ERROR][rec-job/rec-job] Flink recovery failed
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> * -- *“This message contains confidential
>>>>> information/commercial secret. If you are not the intended addressee of
>>>>> this message you may not copy, save, print or forward it to any third 
>>>>> party
>>>>> and you are kindly requested to destroy this message and notify the sender
>>>>> thereof by email.
>>>>> Данное сообщение содержит конфиденциальную информацию/информацию,
>>>>> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
>>>>> данного сообщения, Вы не вправе копировать, сохранять, печатать или
>>>>> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение 
>>>>> и
>>>>> уведомить об этом отправителя электронным письмом.”
>>>>>
>>>>

-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Flink Operator 1.6 causes JobManagerDeploymentStatus: MISSING

2023-10-18 Thread Tony Chen
Hi Flink Community,

Most of the Flink applications run on 1.14 at my company. After upgrading
the Flink Operator to 1.6, we've seen many jobmanager pods show
"JobManagerDeploymentStatus: MISSING".

Here are some logs from the operator pod on one of our Flink applications:

[m [33m2023-10-18 02:02:40,823 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
SAVEPOINTERROR | Savepoint failed for savepointTriggerNonce: null
...
[m [33m2023-10-18 02:02:40,883 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning |
CLUSTERDEPLOYMENTEXCEPTION | Status have been modified externally in
version 17447422864 Previous: 
...
[m [33m2023-10-18 02:02:40,919 [m [36mi.j.o.p.e.ReconciliationDispatcher [m
[1;31m[ERROR][nemo/nemo-streaming-users-identi-updates] Error during event
processing ExecutionScope{ resource id:
ResourceID{name='nemo-streaming-users-identi-updates', namespace='nemo'},
version: 17447420285} failed.
...
org.apache.flink.kubernetes.operator.exception.ReconciliationException:
org.apache.flink.kubernetes.operator.exception.StatusConflictException:
Status have been modified externally in version 17447422864 Previous:

...
[m [33m2023-10-18 02:03:03,273 [m [36mo.a.f.k.o.o.d.ApplicationObserver [m
[1;31m[ERROR][nemo/nemo-streaming-users-identi-updates] Missing JobManager
deployment
...
[m [33m2023-10-18 02:03:03,295 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][nemo/nemo-streaming-users-identi-updates] >>> Event | Warning | MISSING |
Missing JobManager deployment
[m [33m2023-10-18 02:03:03,295 [m [36mo.a.f.c.Configuration [m [33m[WARN
][nemo/nemo-streaming-users-identi-updates] Config uses deprecated
configuration key 'high-availability' instead of proper key
'high-availability.type'


This seems related to this email thread:
https://www.mail-archive.com/user@flink.apache.org/msg51439.html. However,
I believe that we're not seeing the HA metadata getting deleted.

What could cause the JobManagerDeploymentStatus to be MISSING?

Thanks,
Tony

-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Real-time task blocking problem

2023-10-13 Thread rui chen
After the task restart of our 1.13 version, kakfa consumption zero problem
occurred. Have you ever encountered it?


Real-time task blocking problem

2023-10-13 Thread rui chen
After the task restart of our 1.13 version, kakfa consumption zero problem
occurred. Have you ever encountered it?


Problems with the state.backend.fs.memory-threshold parameter

2023-10-13 Thread rui chen
We found that for some tasks, the JM memory continued to increase. I set
the parameter of state.backend.fs.memory-threshold to 0, and the JM memory
would no longer increase, but many small files might be written in this
way. Does the community have any optimization plan for this area?


Problems with the state.backend.fs.memory-threshold parameter

2023-10-13 Thread rui chen
We found that for some tasks, the JM memory continued to increase. I set
the parameter of state.backend.fs.memory-threshold to 0, and the JM memory
would no longer increase, but many small files might be written in this
way. Does the community have any optimization plan for this area?


Re: Cannot find metata file metadats in directory

2023-10-13 Thread rui chen
After the task is restarted for several times, it is found that the
supported cp is deleted. I view the audit log of HDFS and find that the
deletion request comes from JM

Hangxiang Yu  于2023年9月30日周六 17:10写道:

> Hi,
> How did you point out the checkpoint path you restored from ?
>
> Seems that you are trying to restore from a not completed or failed
> checkpoint.
>
> On Thu, Sep 28, 2023 at 6:09 PM rui chen  wrote:
>
> > When we use 1.13.2,we have the following error:
> > FileNotFoundException: Cannot find metata file metadats in directory
> > 'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.
> >
>
>
> --
> Best,
> Hangxiang.
>


Re: Cannot find metata file metadats in directory

2023-10-13 Thread rui chen
After the task is restarted for several times, it is found that the
supported cp is deleted. I view the audit log of HDFS and find that the
deletion request comes from JM

Hangxiang Yu  于2023年9月30日周六 17:10写道:

> Hi,
> How did you point out the checkpoint path you restored from ?
>
> Seems that you are trying to restore from a not completed or failed
> checkpoint.
>
> On Thu, Sep 28, 2023 at 6:09 PM rui chen  wrote:
>
> > When we use 1.13.2,we have the following error:
> > FileNotFoundException: Cannot find metata file metadats in directory
> > 'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.
> >
>
>
> --
> Best,
> Hangxiang.
>


[DISCUSS][FLINK-33240] Document deprecated options as well

2023-10-11 Thread Zhanghao Chen
Hi Flink users and developers,

Currently, Flink won't generate doc for the deprecated options. This might 
confuse users when upgrading from an older version of Flink: they have to 
either carefully read the release notes or check the source code for upgrade 
guidance on deprecated options.

I propose to document deprecated options as well, with a "(deprecated)" tag 
placed at the beginning of the option description to highlight the deprecation 
status [1].

Looking forward to your feedbacks on it.

[1] https://issues.apache.org/jira/browse/FLINK-33240

Best,
Zhanghao Chen


Re: Metric to capture decoding failure in flink sources

2023-10-10 Thread Mason Chen
Hi Prateek,

I agree, the reader should ideally expose the context to record metrics
about deserialization. One option is to defer deserialization to another
operator, say a RichMapFunction that has access to the RuntimeContext.

Best,
Mason

On Mon, Oct 9, 2023 at 12:42 PM Prateek Kohli 
wrote:

> Hi,
>
> I need to get the difference between records which are collected by the
> source and the records which are emitted.
> For eg - If deserialization fails while reading records from kafka, in
> that case I want to expose the difference between records collected from
> Kafka Broker and records emitted from Kafka operator after deserialization
> as a metric.
>
> But I think flink does not provide any such metrics.
>
> In Kafka Source I can have a workaround to get this metric:
>
> I can override the open method from KafkaRecordDeserializationSchema where
> a metric can be added to show decoding failures:
>
> @Override
> public void open(DeserializationSchema.InitializationContext context)
> throws Exception {
> context.getMetricGroup().gauge("decodingFailures", new Gauge()
> {
> @Override
> public Integer getValue()
> {
> return decodingFailures;
> }
> });
> }
>
> and at the time of deserialization I can increment that counter as below:
>
> @Override
> public void deserialize(ConsumerRecord record,
> Collector out)
> {
> try
> {
> //deserialize
> }
> catch (IOException | MMException e)
> {
> logger.error(String.format("Error received while decoding, in
> partition [%d] for topic [%s] at offset [%d]: %s",
> partition, topic, offset, e.toString()));
>
> decodingFailures++;
> }
>
> *But there is no such way to implement this in FileSource, as
> SimpleStreamFormat/Reader does not provide access to Context in any way.*
>
> *Is there any way I can get this metric in both File & Kafka Collector or
> any generic way to get this agnostic to what collector is being used?*
>
> Regards,
> Prateek Kohli
>


Re: observedGeneration field in FlinkDeployment

2023-10-09 Thread Tony Chen
I think that a FLIP JIRA should be created to add an `observedGeneration`
field to the spec. When I look at other kubernetes APIs, I see the
`observedGeneration` field in many of them:
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/

On Mon, Sep 11, 2023 at 11:51 AM Tony Chen  wrote:

> I think it should be fine for now. I can compare the following 2 fields to
> check if the spec has changed:
>
>- metadata.generation: 4
>- status.reconciliationStatus.lastStableSpec: '{"resource_metadata":
>{"metadata": {"generation": 2}'
>
>
> On Sat, Sep 9, 2023 at 2:48 AM Gyula Fóra  wrote:
>
>> Actually, I just realized, the last fully reconciled spec generation
>> should be written into a metadata JSON inside the lastReconciledSpec. So
>> this is already available.
>>
>> For example:
>> lastReconciledSpec: '{"spec":{...},"resource_metadata":{"apiVersion":"
>> flink.apache.org/v1beta1
>> ","metadata":{"generation":2},"firstDeployment":true}}'
>>
>> It's a bit hidden but it should do the trick :)
>> We could discuss moving this to a more standardized status field if you
>> think that's worth the effort.
>>
>> Gyula
>>
>> On Sat, Sep 9, 2023 at 7:04 AM Gyula Fóra  wrote:
>>
>>> Hi!
>>> The lastReconciledSpec field serves similar purpose . We also use the
>>> generation in parts of the logic but not generically as observed generation
>>> .
>>>
>>> Could you give an example where this would be useful in addition to what
>>> we already have?
>>>
>>> Thanks
>>> Gyula
>>>
>>> On Sat, 9 Sep 2023 at 02:17, Tony Chen  wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> I noticed that there is no status.observedGeneration field in the
>>>> FlinkDeployment spec. The closest field to this is
>>>> status.reconciliationStatus.lastReconciledSpec. Are there plans to add the
>>>> observedGeneration field in the spec? This field will be helpful in
>>>> detecting changes in the FlinkDeployment spec.
>>>>
>>>> Thanks,
>>>> Tony
>>>>
>>>> --
>>>>
>>>> <http://www.robinhood.com/>
>>>>
>>>> Tony Chen
>>>>
>>>> Software Engineer
>>>>
>>>> Menlo Park, CA
>>>>
>>>> Don't copy, share, or use this email without permission. If you
>>>> received it by accident, please let us know and then delete it right away.
>>>>
>>>
>
> --
>
> <http://www.robinhood.com/>
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


回复: How/where to check the operator id

2023-10-06 Thread Chen Zhanghao
Hi Megh,

Unfortunately, Flink currently does not expose operator IDs via REST API, nor 
log them unless some operators' states are missing during state recovery as 
operator IDs are mostly used internally. Could you further collaborate a bit on 
why you need the operator ID info? Maybe you can create a JIRA ticket to expose 
the info then.

Best,
Zhanghao Chen

发件人: megh vidani 
发送时间: 2023年10月6日 14:20
收件人: user@flink.apache.org 
主题: How/where to check the operator id

Hello community,

We're making use of uid method on all the stateful operators to assign custom 
operator id's.

However, I don't see any reference to these operator id's anywhere in the job 
logs or Flink UI or even checkpoint details.

Do I need to look into some other place?

Thanks,
Megh


Re: Rolling back a bad deployment of FlinkDeployment on kubernetes

2023-10-06 Thread Tony Chen
So, I was able to get the rollback to work after I changed my upgradeMode
to *last-state*. Previously, my upgradeMode was *savepoint*, and when I
deployed a bad commit, the jobmanager-leader configmap would get deleted.
Once I changed the upgradeMode to *last-state*, the configmap was retained
when I deployed a bad commit, and the operator was able to rollback the
spec after the timeout.

On Thu, Oct 5, 2023 at 3:45 PM Gyula Fóra  wrote:

> Hi Tony!
>
> There are still a few corner cases when the operator cannot upgrade /
> rollback deployments due to the loss of HA metadata (and with that
> checkpoint information).
>
> Most of these issues are not related to the operator logic directly but to
> how Flink handles certain failures and are related to:
>
> https://issues.apache.org/jira/browse/FLINK-30444 and
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-360%3A+Merging+the+ExecutionGraphInfoStore+and+the+JobResultStore+into+a+single+component+CompletedJobStore
>
> Rollbacks are designed to allow automatic fallback to the last stable
> spec, but the mechanism doesn't work in these corner cases (in the same way
> spec upgrades also dont)
>
> I hope this helps to understand the problem.
> The solution in these cases is to manually recover the job from the last
> checkpoint/savepoint.
>
> Cheers,
> Gyula
>
>
> On Thu, Oct 5, 2023 at 7:56 PM Tony Chen  wrote:
>
>> I tried this out with operator version 1.4 and it didn't work for me. I
>> noticed that when I was deploying a bad version, the Kubernetes HA metadata
>> and configmaps were deleted:
>>
>> [m [33m2023-10-05 14:52:17,493 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][flink-testing-service/flink-testing-service] >>> Event | Info |
>> SPECCHANGED | UPGRADE change(s) detected
>> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo,job.initialSavepointPath=s3a://robinhood-prod-flink/flink-testing-service/savepoints/savepoint-b832ef-05b185cb5800]
>> differs from
>> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob,job.initialSavepointPath=]),
>> starting reconciliation.
>> ...
>> [m [33m2023-10-05 14:52:51,054 [m [36mo.a.f.k.o.s.AbstractFlinkService [m
>> [32m[INFO ][flink-testing-service/flink-testing-service] Cluster shutdown
>> completed.
>> [m [33m2023-10-05 14:52:51,054 [m [36mo.a.f.k.o.s.AbstractFlinkService [m
>> [32m[INFO ][flink-testing-service/flink-testing-service] Deleting
>> Kubernetes HA metadata
>> [m [33m2023-10-05 14:52:51,196 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][flink-testing-service/flink-testing-service] >>> Status | Info |
>> UPGRADING | The resource is being upgraded
>>
>>
>>
>> Eventually, the rollbak fails because the HA metadata is missing:
>>
>> [m [33m2023-10-05 14:58:16,119 [m
>> [36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [33m[WARN
>> ][flink-testing-service/flink-testing-service] Rollback is not possible due
>> to missing HA metadata
>>
>>
>>
>> Besides setting kubernetes.operator.deployment.rollback.enabled: true,
>> is there anything else that I need to configure?
>>
>> On Thu, Oct 5, 2023 at 10:35 AM Tony Chen 
>> wrote:
>>
>>> I just saw this experimental feature in the documentation:
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#application-upgrade-rollbacks-experimental
>>>
>>> I'm guessing this is the only way to automate rollbacks for now.
>>>
>>> On Wed, Oct 4, 2023 at 3:25 PM Tony Chen 
>>> wrote:
>>>
>>>> Hi Flink Community,
>>>>
>>>> I am currently running Apache flink-kubernetes-operator on our
>>>> kubernetes clusters, and I have Flink applications that are deployed using
>>>> the FlinkDeployment Custom Resources (CR). I am trying to automate the
>>>> process of rollbacks and I am running into some issues.
>>>>
>>>> I was testing out a bad deployment where the jobmanager never becomes
>>>> healthy. I simulated this bad deployment by creating a Flink image with a
>>>> bug in it. I see in the operator logs that the jobmanager is unhealthy:
>>>>
>>>> [m [33m2023-10-02 22:14:34,874 [m
>>>> [36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [32m[INFO
>>>> ][flink-testing-service/flink-testing-service] UPGRADE change(s) detected
>>>> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo]
>>>> differs from
>>>> FlinkDeploymentSpec[job.entr

Re: Rolling back a bad deployment of FlinkDeployment on kubernetes

2023-10-05 Thread Tony Chen
I tried this out with operator version 1.4 and it didn't work for me. I
noticed that when I was deploying a bad version, the Kubernetes HA metadata
and configmaps were deleted:

[m [33m2023-10-05 14:52:17,493 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][flink-testing-service/flink-testing-service] >>> Event | Info |
SPECCHANGED | UPGRADE change(s) detected
(FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo,job.initialSavepointPath=s3a://robinhood-prod-flink/flink-testing-service/savepoints/savepoint-b832ef-05b185cb5800]
differs from
FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob,job.initialSavepointPath=]),
starting reconciliation.
...
[m [33m2023-10-05 14:52:51,054 [m [36mo.a.f.k.o.s.AbstractFlinkService [m
[32m[INFO ][flink-testing-service/flink-testing-service] Cluster shutdown
completed.
[m [33m2023-10-05 14:52:51,054 [m [36mo.a.f.k.o.s.AbstractFlinkService [m
[32m[INFO ][flink-testing-service/flink-testing-service] Deleting
Kubernetes HA metadata
[m [33m2023-10-05 14:52:51,196 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][flink-testing-service/flink-testing-service] >>> Status | Info |
UPGRADING | The resource is being upgraded



Eventually, the rollbak fails because the HA metadata is missing:

[m [33m2023-10-05 14:58:16,119 [m
[36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [33m[WARN
][flink-testing-service/flink-testing-service] Rollback is not possible due
to missing HA metadata



Besides setting kubernetes.operator.deployment.rollback.enabled: true, is
there anything else that I need to configure?

On Thu, Oct 5, 2023 at 10:35 AM Tony Chen  wrote:

> I just saw this experimental feature in the documentation:
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#application-upgrade-rollbacks-experimental
>
> I'm guessing this is the only way to automate rollbacks for now.
>
> On Wed, Oct 4, 2023 at 3:25 PM Tony Chen  wrote:
>
>> Hi Flink Community,
>>
>> I am currently running Apache flink-kubernetes-operator on our kubernetes
>> clusters, and I have Flink applications that are deployed using the
>> FlinkDeployment Custom Resources (CR). I am trying to automate the process
>> of rollbacks and I am running into some issues.
>>
>> I was testing out a bad deployment where the jobmanager never becomes
>> healthy. I simulated this bad deployment by creating a Flink image with a
>> bug in it. I see in the operator logs that the jobmanager is unhealthy:
>>
>> [m [33m2023-10-02 22:14:34,874 [m
>> [36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [32m[INFO
>> ][flink-testing-service/flink-testing-service] UPGRADE change(s) detected
>> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo]
>> differs from
>> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob]),
>> starting reconciliation.
>> ...
>> [m [33m2023-10-02 22:15:09,001 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][flink-testing-service/flink-testing-service] >>> Status | Info |
>> UPGRADING | The resource is being upgraded
>> ...
>> [m [33m2023-10-02 22:17:23,911 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][flink-testing-service/flink-testing-service] >>> Status | Error |
>> DEPLOYED |
>> {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"back-off
>> 20s restarting failed container=flink-main-container
>> pod=flink-testing-service-749dd97c75-4w9ps_flink-testing-service(6db1adb3-4ca4-4924-a8c3-57a417818d85)","additionalMetadata":{"reason":"CrashLoopBackOff"},"throwableList":[]}
>>
>> ...
>> [m [33m2023-10-02 22:17:33,576 [m [36mo.a.f.k.o.o.d.ApplicationObserver
>> [m [32m[INFO ][flink-testing-service/flink-testing-service] Observing
>> JobManager deployment. Previous status: ERROR
>>
>>
>> What I do next is I change the spec of the FlinkDeployment so that it
>> uses a Flink image that is healthy. The operator shows that the spec has
>> changed:
>>
>> [m [33m2023-10-02 22:45:37,445 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
>> ][flink-testing-service/flink-testing-service] >>> Event | Info |
>> SPECCHANGED | UPGRADE change(s) detected
>> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob,job.initialSavepointPath=s3a://robinhood-dev-core-flink-states/flink-testing-service/savepoints/savepoint-329a14-2f8264206b1d]
>> differs from
>> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo,job.initialSavepointPath=s3a://robinhood-dev-core-flink-

Re: Rolling back a bad deployment of FlinkDeployment on kubernetes

2023-10-05 Thread Tony Chen
I just saw this experimental feature in the documentation:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#application-upgrade-rollbacks-experimental

I'm guessing this is the only way to automate rollbacks for now.

On Wed, Oct 4, 2023 at 3:25 PM Tony Chen  wrote:

> Hi Flink Community,
>
> I am currently running Apache flink-kubernetes-operator on our kubernetes
> clusters, and I have Flink applications that are deployed using the
> FlinkDeployment Custom Resources (CR). I am trying to automate the process
> of rollbacks and I am running into some issues.
>
> I was testing out a bad deployment where the jobmanager never becomes
> healthy. I simulated this bad deployment by creating a Flink image with a
> bug in it. I see in the operator logs that the jobmanager is unhealthy:
>
> [m [33m2023-10-02 22:14:34,874 [m
> [36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [32m[INFO
> ][flink-testing-service/flink-testing-service] UPGRADE change(s) detected
> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo]
> differs from
> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob]),
> starting reconciliation.
> ...
> [m [33m2023-10-02 22:15:09,001 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][flink-testing-service/flink-testing-service] >>> Status | Info |
> UPGRADING | The resource is being upgraded
> ...
> [m [33m2023-10-02 22:17:23,911 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][flink-testing-service/flink-testing-service] >>> Status | Error |
> DEPLOYED |
> {"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"back-off
> 20s restarting failed container=flink-main-container
> pod=flink-testing-service-749dd97c75-4w9ps_flink-testing-service(6db1adb3-4ca4-4924-a8c3-57a417818d85)","additionalMetadata":{"reason":"CrashLoopBackOff"},"throwableList":[]}
>
> ...
> [m [33m2023-10-02 22:17:33,576 [m [36mo.a.f.k.o.o.d.ApplicationObserver [m
> [32m[INFO ][flink-testing-service/flink-testing-service] Observing
> JobManager deployment. Previous status: ERROR
>
>
> What I do next is I change the spec of the FlinkDeployment so that it uses
> a Flink image that is healthy. The operator shows that the spec has changed:
>
> [m [33m2023-10-02 22:45:37,445 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][flink-testing-service/flink-testing-service] >>> Event | Info |
> SPECCHANGED | UPGRADE change(s) detected
> (FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob,job.initialSavepointPath=s3a://robinhood-dev-core-flink-states/flink-testing-service/savepoints/savepoint-329a14-2f8264206b1d]
> differs from
> FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo,job.initialSavepointPath=s3a://robinhood-dev-core-flink-states/flink-testing-service/savepoints/savepoint-dc1077-134923759e30]),
> starting reconciliation.
>
>
> However, the Flink operator cannot reconcile this spec change, and the
> jobmanager is now permanently failing because it's still running the bad
> Flink image:
>
> [m [33m2023-10-02 22:45:37,461 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
> ][flink-testing-service/flink-testing-service] >>> Event | Warning |
> UPGRADEFAILED | JobManager deployment is missing and HA data is not
> available to make stateful upgrades. It is possible that the job has
> finished or terminally failed, or the configmaps have been deleted. Manual
> restore required.
>
> I can simply delete this FlinkDeployment and redeploy with the healthy
> Flink image, but I would like to avoid manual restores if possible. Is it
> possible to recover by just changing the FlinkDeployment spec?
>
> Thanks,
> Tony
>
> --
>
> <http://www.robinhood.com/>
>
> Tony Chen
>
> Software Engineer
>
> Menlo Park, CA
>
> Don't copy, share, or use this email without permission. If you received
> it by accident, please let us know and then delete it right away.
>


-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Rolling back a bad deployment of FlinkDeployment on kubernetes

2023-10-04 Thread Tony Chen
Hi Flink Community,

I am currently running Apache flink-kubernetes-operator on our kubernetes
clusters, and I have Flink applications that are deployed using the
FlinkDeployment Custom Resources (CR). I am trying to automate the process
of rollbacks and I am running into some issues.

I was testing out a bad deployment where the jobmanager never becomes
healthy. I simulated this bad deployment by creating a Flink image with a
bug in it. I see in the operator logs that the jobmanager is unhealthy:

[m [33m2023-10-02 22:14:34,874 [m
[36mo.a.f.k.o.r.d.AbstractFlinkResourceReconciler [m [32m[INFO
][flink-testing-service/flink-testing-service] UPGRADE change(s) detected
(FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo]
differs from
FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob]),
starting reconciliation.
...
[m [33m2023-10-02 22:15:09,001 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][flink-testing-service/flink-testing-service] >>> Status | Info |
UPGRADING | The resource is being upgraded
...
[m [33m2023-10-02 22:17:23,911 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][flink-testing-service/flink-testing-service] >>> Status | Error |
DEPLOYED |
{"type":"org.apache.flink.kubernetes.operator.exception.DeploymentFailedException","message":"back-off
20s restarting failed container=flink-main-container
pod=flink-testing-service-749dd97c75-4w9ps_flink-testing-service(6db1adb3-4ca4-4924-a8c3-57a417818d85)","additionalMetadata":{"reason":"CrashLoopBackOff"},"throwableList":[]}

...
[m [33m2023-10-02 22:17:33,576 [m [36mo.a.f.k.o.o.d.ApplicationObserver [m
[32m[INFO ][flink-testing-service/flink-testing-service] Observing
JobManager deployment. Previous status: ERROR


What I do next is I change the spec of the FlinkDeployment so that it uses
a Flink image that is healthy. The operator shows that the spec has changed:

[m [33m2023-10-02 22:45:37,445 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][flink-testing-service/flink-testing-service] >>> Event | Info |
SPECCHANGED | UPGRADE change(s) detected
(FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJob,job.initialSavepointPath=s3a://robinhood-dev-core-flink-states/flink-testing-service/savepoints/savepoint-329a14-2f8264206b1d]
differs from
FlinkDeploymentSpec[job.entryClass=com.robinhood.flink.chaos.StreamingSumByKeyJo,job.initialSavepointPath=s3a://robinhood-dev-core-flink-states/flink-testing-service/savepoints/savepoint-dc1077-134923759e30]),
starting reconciliation.


However, the Flink operator cannot reconcile this spec change, and the
jobmanager is now permanently failing because it's still running the bad
Flink image:

[m [33m2023-10-02 22:45:37,461 [m [36mo.a.f.k.o.l.AuditUtils [m [32m[INFO
][flink-testing-service/flink-testing-service] >>> Event | Warning |
UPGRADEFAILED | JobManager deployment is missing and HA data is not
available to make stateful upgrades. It is possible that the job has
finished or terminally failed, or the configmaps have been deleted. Manual
restore required.

I can simply delete this FlinkDeployment and redeploy with the healthy
Flink image, but I would like to avoid manual restores if possible. Is it
possible to recover by just changing the FlinkDeployment spec?

Thanks,
Tony

-- 

<http://www.robinhood.com/>

Tony Chen

Software Engineer

Menlo Park, CA

Don't copy, share, or use this email without permission. If you received it
by accident, please let us know and then delete it right away.


Re: Custom Prometheus metrics disappeared in 1.16.2 => 1.17.1 upgrade

2023-10-03 Thread Mason Chen
Hi Javier,

Is there a particular reason why you aren't leveraging Flink metric API? It
seems that functionality was internal to the PrometheusReporter
implementation and your usecase should've continued working if it had
depended on Flink's  metric API.

Best,
Mason

On Thu, Sep 28, 2023 at 2:51 AM Javier Vegas  wrote:

> Thanks! I saw the first change but missed the third one, that is the
> most that most probably explains my problem, most probably the metrics
> I was sending with the twitter/finagle statsReceiver ended up in the
> singleton default registry and were exposed by Flink with all the
> other Flink metrics, but now that Flink uses its own registry I have
> no idea where my custom metrics end up
>
>
> El mié, 27 sept 2023 a las 4:56, Kenan Kılıçtepe
> () escribió:
> >
> > Have you checked the metric  changes in 1.17.
> >
> > From release notes 1.17:
> >
> https://nightlies.apache.org/flink/flink-docs-master/release-notes/flink-1.17/
> >
> > Metric Reporters #
> > Only support reporter factories for instantiation #
> > FLINK-24235 #
> > Configuring reporters by their class is no longer supported. Reporter
> implementations must provide a MetricReporterFactory, and all
> configurations must be migrated to such a factory.
> >
> > UseLogicalIdentifier makes datadog consider metric as custom #
> > FLINK-30383 #
> > The Datadog reporter now adds a “flink.” prefix to metric identifiers if
> “useLogicalIdentifier” is enabled. This is required for these metrics to be
> recognized as Flink metrics, not custom ones.
> >
> > Use separate Prometheus CollectorRegistries #
> > FLINK-30020 #
> > The PrometheusReporters now use a separate CollectorRegistry for each
> reporter instance instead of the singleton default registry. This generally
> shouldn’t impact setups, but it may break code that indirectly interacts
> with the reporter via the singleton instance (e.g., a test trying to assert
> what metrics are reported).
> >
> >
> >
> > On Wed, Sep 27, 2023 at 11:11 AM Javier Vegas  wrote:
> >>
> >> I implemented some custom Prometheus metrics that were working on
> >> 1.16.2, with my configuration
> >>
> >> metrics.reporter.prom.factory.class:
> >> org.apache.flink.metrics.prometheus.PrometheusReporterFactory
> >> metrics.reporter.prom.port: 
> >>
> >> I could see both Flink metrics and my custom metrics on port  of
> >> my task managers
> >>
> >> After upgrading to 1.17.1, using the same configuration, I can see
> >> only the FLink metrics on port  of the task managers,
> >> the custom metrics are getting lost somewhere.
> >>
> >> The release notes for 1.17 mention
> >> https://issues.apache.org/jira/browse/FLINK-24235
> >> that removes instantiating reporters by name and forces using a
> >> factory, which I was already doing in 1.16.2. Do I need to do
> >> anything extra after those changes so my metrics are aggregated with
> >> the Flink ones?
> >>
> >> I am also seeing this error message on application startup (which I
> >> was already seeing in 1.16.2): "Multiple implementations of the same
> >> reporter were found in 'lib' and/or 'plugins' directories for
> >> org.apache.flink.metrics.prometheus.PrometheusReporterFactory. It is
> >> recommended to remove redundant reporter JARs to resolve used
> >> versions' ambiguity." Could that also explain the missing metrics?
> >>
> >> Thanks,
> >>
> >> Javier Vegas
>


Cannot find metata file metadats in directory

2023-09-28 Thread rui chen
When we use 1.13.2,we have the following error:
FileNotFoundException: Cannot find metata file metadats in directory
'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.


Cannot find metata file metadats in directory

2023-09-28 Thread rui chen
When we use 1.13.2,we have the following error:
FileNotFoundException: Cannot find metata file metadats in directory
'hdfs://xx/f408dbe327f9e5053e76d7b5323d6e81/chk-173'.


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-28 Thread rui chen
hi Feng

Are you using the open jdk or the oracle jdk?

Best,
rui

rui chen  于2023年9月27日周三 20:22写道:

> hi Feng,
>
> Thanks for your reply, we are 8 u192 JDK, may be is the question, I found
> a JDK issue:https://bugs.openjdk.org/browse/JDK-8215355.
>
> Best,
> rui
>
> Feng Jin  于2023年9月27日周三 20:09写道:
>
>> hi Rui,
>>
>> Which version of JDK are you using?
>>
>> This issue could potentially be a bug in the JDK version.
>>
>> If you are using JDK 8, you can try using OpenJDK 8u265 as a possible
>> solution.
>>
>>
>> Best,
>> Feng
>>
>>
>> On Wed, Sep 27, 2023 at 8:08 PM rui chen  wrote:
>>
>>>
>>>
>>> rui chen  于2023年9月27日周三 19:32写道:
>>>
>>>> hi Feng,
>>>>
>>>> Thank you for your reply,We observed the GC situation, there is no
>>>> change before and after replacement, several tasks on our line using
>>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
>>>> has been found.
>>>>
>>>> Best,
>>>> rui
>>>>
>>>> Feng Jin  于2023年9月27日周三 19:19写道:
>>>>
>>>>>
>>>>> hi rui,
>>>>>
>>>>> In general, checkpoint timeouts are typically associated with the
>>>>> job's processing performance. When using jemalloc, performance degradation
>>>>> is generally not observed.
>>>>>
>>>>> It is advisable to analyze whether the job's garbage collection (GC)
>>>>> has become more frequent.
>>>>>
>>>>>
>>>>> Best,
>>>>> Feng
>>>>>
>>>>>
>>>>> On Mon, Sep 25, 2023 at 1:21 PM rui chen  wrote:
>>>>>
>>>>>> After using the jemalloc memory allocator for a period of time,
>>>>>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>>>>>> flink version:1.13.2, jiemalloc version: 5.3.0
>>>>>>
>>>>>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread rui chen
hi Feng,

Thanks for your reply, we are 8 u192 JDK, may be is the question, I found a
JDK issue:https://bugs.openjdk.org/browse/JDK-8215355.

Best,
rui

Feng Jin  于2023年9月27日周三 20:09写道:

> hi Rui,
>
> Which version of JDK are you using?
>
> This issue could potentially be a bug in the JDK version.
>
> If you are using JDK 8, you can try using OpenJDK 8u265 as a possible
> solution.
>
>
> Best,
> Feng
>
>
> On Wed, Sep 27, 2023 at 8:08 PM rui chen  wrote:
>
>>
>>
>> rui chen  于2023年9月27日周三 19:32写道:
>>
>>> hi Feng,
>>>
>>> Thank you for your reply,We observed the GC situation, there is no
>>> change before and after replacement, several tasks on our line using
>>> jemalloc have appeared stuck, after removing jemalloc, no stuck situation
>>> has been found.
>>>
>>> Best,
>>> rui
>>>
>>> Feng Jin  于2023年9月27日周三 19:19写道:
>>>
>>>>
>>>> hi rui,
>>>>
>>>> In general, checkpoint timeouts are typically associated with the job's
>>>> processing performance. When using jemalloc, performance degradation is
>>>> generally not observed.
>>>>
>>>> It is advisable to analyze whether the job's garbage collection (GC)
>>>> has become more frequent.
>>>>
>>>>
>>>> Best,
>>>> Feng
>>>>
>>>>
>>>> On Mon, Sep 25, 2023 at 1:21 PM rui chen  wrote:
>>>>
>>>>> After using the jemalloc memory allocator for a period of time,
>>>>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>>>>> flink version:1.13.2, jiemalloc version: 5.3.0
>>>>>
>>>>


Re: After using the jemalloc memory allocator for a period of time, checkpoint timeout occurs and tasks are stuck

2023-09-27 Thread rui chen
rui chen  于2023年9月27日周三 19:32写道:

> hi Feng,
>
> Thank you for your reply,We observed the GC situation, there is no change
> before and after replacement, several tasks on our line using jemalloc have
> appeared stuck, after removing jemalloc, no stuck situation has been found.
>
> Best,
> rui
>
> Feng Jin  于2023年9月27日周三 19:19写道:
>
>>
>> hi rui,
>>
>> In general, checkpoint timeouts are typically associated with the job's
>> processing performance. When using jemalloc, performance degradation is
>> generally not observed.
>>
>> It is advisable to analyze whether the job's garbage collection (GC) has
>> become more frequent.
>>
>>
>> Best,
>> Feng
>>
>>
>> On Mon, Sep 25, 2023 at 1:21 PM rui chen  wrote:
>>
>>> After using the jemalloc memory allocator for a period of time,
>>> checkpoint timeout occurs and tasks are stuck. Who has encountered this?
>>> flink version:1.13.2, jiemalloc version: 5.3.0
>>>
>>


  1   2   3   4   5   6   7   >