Re: Flatmap operator in an Asynchronous call

2022-03-25 Thread Diwakar Jha
I'm not able to use asyncIO because the file will not fit in memory. I
thought that flatmap will allow me to enrich/process records while
downloading instead of waiting for the whole file to get downloaded. The
solution works but its not scalable because i'm not able to use
AsynFunction in Flatmap.

On Wed, Mar 9, 2022 at 4:52 AM Arvid Heise  wrote:

> You can use flatMap to flatten and have an asyncIO after it.
>
> On Wed, Mar 9, 2022 at 8:08 AM Diwakar Jha  wrote:
>
>> Thanks Gen, I will look into customized Source and SpiltEnumerator.
>>
>> On Mon, Mar 7, 2022 at 10:20 PM Gen Luo  wrote:
>>
>>> Hi Diwakar,
>>>
>>> An asynchronous flatmap function without the support of the framework
>>> can be problematic. You should not call collector.collect outside the main
>>> thread of the task, i.e. outside the flatMap method.
>>>
>>> I'd suggest using a customized Source instead to process the files,
>>> which uses a SplitEnumerator to discover the files and SourceReaders to
>>> read the files. In this way checkpoints can be triggered between two calls
>>> of pollNext, so you don't have to implement it asynchronously. It would be
>>> better if the readers read the lines and the records are enriched in a map
>>> function following.
>>>
>>>
>>>
>>> On Tue, Mar 8, 2022 at 5:17 AM Diwakar Jha 
>>> wrote:
>>>
 Hello Everyone,

 I'm running a streaming application using Flink 1.11 and EMR 6.01. My
 use case is reading files from a s3 bucket, filter file contents ( say
 record) and enrich each record. Filter records and output to a sink.
 I'm reading 6k files per 15mints and the total number of records is 3
 billion/15mints. I'm using a flat map operator to convert the file into
 records and enrich records in a synchronous call.

 *Problem* : My application fails (Checkpoint timeout) to run if i add
 more filter criteria(operator). I suspect the system is not able to scale
 (CPU util as still 20%) because of the synchronous call. I want to convert
 this flat map to an asynchronous call using AsyncFunction. I was looking
 for something like an AsyncCollector.collect
 

 https://nightlies.apache.org/flink/flink-docs-release-1.3/api/java/org/apache/flink/streaming/api/functions/async/AsyncFunction.html
 to complement my current synchronous implementation using flatmap but it
 seems like this is not available in Flink 1.11.

 *Question* :
 Could someone please help me with converting this flatmap operation to
 an asynchronous call?

 Please let me know if you have any questions.

 Best,

>>>


Re: flink-kubernetes-operator: Flink deployment stuck in scheduled state when increasing resource CPU above 1

2022-03-25 Thread Makas Tzavellas
Thanks everyone for the responses.

I have checked and noticed that the pod fails to start due to insufficient
resources with kubectl describe pods. It is able to run only if I set
jobManager to 1 CPU and taskManager to 1.5 CPU at maximum.

But it's good to know that it wasn't due to an incorrect configuration in
the deployment.

Thanks!

On Sat, Mar 26, 2022 at 12:49 PM Yang Wang  wrote:

> Could you please share the result of "kubectl describe pods" when getting
> stuck? It will be very useful to help to figure out the root cause.
>
> I guess it might be related to insufficient resources for minikube.
>
>
> Best,
> Yang
>
> Őrhidi Mátyás  于2022年3月26日周六 03:12写道:
>
>> It's worth checking the deployment->replica set->pod chain for error
>> message.
>>
>> On Fri, Mar 25, 2022, 19:49 Makas Tzavellas 
>> wrote:
>>
>>> Hi,
>>>
>>> I have been experimenting with flink-kubernetes-operator today and it's
>>> very cool. However, the Flink application will be stuck in scheduled state
>>> if I increase the CPU to anything above 1 for JobManager and TaskManager.
>>> It works fine if I keep the CPU to 1.
>>>
>>> I am testing the Flink deployment with Minikube running with 4 CPUs and
>>> 8GB RAM.
>>>
>>> Unfortunately, I am not sure what to look out for to figure out why it's
>>> stuck.
>>>
>>> Appreciate it if someone could help point me in the right direction.
>>>
>>> Thanks!
>>>
>>


Re: flink-kubernetes-operator: Flink deployment stuck in scheduled state when increasing resource CPU above 1

2022-03-25 Thread Yang Wang
Could you please share the result of "kubectl describe pods" when getting
stuck? It will be very useful to help to figure out the root cause.

I guess it might be related to insufficient resources for minikube.


Best,
Yang

Őrhidi Mátyás  于2022年3月26日周六 03:12写道:

> It's worth checking the deployment->replica set->pod chain for error
> message.
>
> On Fri, Mar 25, 2022, 19:49 Makas Tzavellas 
> wrote:
>
>> Hi,
>>
>> I have been experimenting with flink-kubernetes-operator today and it's
>> very cool. However, the Flink application will be stuck in scheduled state
>> if I increase the CPU to anything above 1 for JobManager and TaskManager.
>> It works fine if I keep the CPU to 1.
>>
>> I am testing the Flink deployment with Minikube running with 4 CPUs and
>> 8GB RAM.
>>
>> Unfortunately, I am not sure what to look out for to figure out why it's
>> stuck.
>>
>> Appreciate it if someone could help point me in the right direction.
>>
>> Thanks!
>>
>


Re: "Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

2022-03-25 Thread Yang Wang
The root cause might be the LoadBalancer could not really work in your
environment. We already have a ticket to track this[1] and will try to get
it resolved in the next release.

For now, could you please have a try by adding
"-Dkubernetes.rest-service.exposed.type=NodePort" to your session and
submission commands?

Maybe you are also interested in the new flink-kubernetes-operator
project[2]. It should make it easier to run a Flink application on the K8s.

[1]. https://issues.apache.org/jira/browse/FLINK-17231
[2]. https://github.com/apache/flink-kubernetes-operator

Best,
Yang

Burcu Gul POLAT EGRI  于2022年3月25日周五 21:39写道:

> I am getting the following error when I try to execute sample at Flink
> documentation - Native Kubernetes
> 
> .
>
> I have succedded to execute the first command in documentation by adding
> some extra parameters with the help of this post
> 
> .
>
> user@local:~/flink-1.14.4$ ./bin/kubernetes-session.sh \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dtaskmanager.memory.process.size=4096m \
>
> -Dkubernetes.taskmanager.cpu=2 \
>
> -Dtaskmanager.numberOfTaskSlots=4 \
>
> -Dresourcemanager.taskmanager-timeout=360 \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config \
>
> -Dkubernetes.jobmanager.service-account=flink-service-account
>
> After executing above command, I have listed the new pod like below.
>
> user@local:~/flink-1.14.4$ kubectl get pods
>
> NAME READY   STATUSRESTARTS   
> AGE
>
> dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1 Running   0  
> 1m
>
> Then, I have executed the below command to submit example job.
>
> user@local:~/flink-1.14.4$ ./bin/flink run --target kubernetes-session \
>
> -Dkubernetes.service-account=flink-service-account \
>
> -Dkubernetes.cluster-id=dproc-example-flink-cluster-id \
>
> -Dkubernetes.namespace=sdt-dproc-flink-test \
>
> -Dkubernetes.config.file=/home/devuser/.kube/config
>
> examples/batch/WordCount.jar --input /home/user/sometexts.txt --output 
> /tmp/flinksample
>
> After a while, I received below logs:
>
> 2022-03-25 12:38:00,538 INFO  
> org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
> flink cluster dproc-example-flink-cluster-id successfully, JobManager Web 
> Interface: http://10.150.140.248:8081
>
>
>
> 
>
>  The program finished with the following exception:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>
> at 
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
>
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
>
> at 
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
>
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
>
> at 
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
>
> Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
>
> at 
> org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
>
> at 
> org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)
>
> at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)
>
> at 
> org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)
>
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>
> at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>
> at 
> 

Re: The number of requested virtual cores for application master 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster.

2022-03-25 Thread Geng Biao
放图的话,建议传到图床上,邮件里放链接。一般运行yarn top即可显示集群状态。

Best,
Biao

获取 Outlook for iOS

发件人: Kaiqiang XIE谢开强 
发送时间: Saturday, March 26, 2022 10:33:40 AM
收件人: user-zh@flink.apache.org 
主题: Re:Re: The number of requested virtual cores for application master 1 
exceeds the maximum number of virtual cores 0 available in the Yarn Cluster.





yarn这是正常的吧?

[cid:44f01b9d$1$17fc4121e18$Coremail$kqx0731$163.com]


在 2022-03-26 09:22:11,"胡伟华"  写道:
>报错是 YARN 集群可用资源不足了,可以检查下 YARN 集群是否正常
>
>> 2022年3月26日 上午8:00,Kaiqiang XIE谢开强  写道:
>>
>> 各位好,再请教一个问题哈
>> mac上部署flink standalone,在运行 bin/flink run -m yarn-cluster 
>> $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar --port 8882之后提示The 
>> number of requested virtual cores for application master 1 exceeds the 
>> maximum number of virtual cores 0 available in the Yarn Cluster.
>>
>>
>>
>>
>> 这是什么问题呢?







Re:Re: The number of requested virtual cores for application master 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster.

2022-03-25 Thread Kaiqiang XIE谢开强









yarn这是正常的吧?





在 2022-03-26 09:22:11,"胡伟华"  写道:
>报错是 YARN 集群可用资源不足了,可以检查下 YARN 集群是否正常
>
>> 2022年3月26日 上午8:00,Kaiqiang XIE谢开强  写道:
>> 
>> 各位好,再请教一个问题哈
>> mac上部署flink standalone,在运行 bin/flink run -m yarn-cluster 
>> $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar --port 8882之后提示The 
>> number of requested virtual cores for application master 1 exceeds the 
>> maximum number of virtual cores 0 available in the Yarn Cluster.
>> 
>> 
>> 
>> 
>> 这是什么问题呢?


Re: The number of requested virtual cores for application master 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster.

2022-03-25 Thread 胡伟华
报错是 YARN 集群可用资源不足了,可以检查下 YARN 集群是否正常

> 2022年3月26日 上午8:00,Kaiqiang XIE谢开强  写道:
> 
> 各位好,再请教一个问题哈
> mac上部署flink standalone,在运行 bin/flink run -m yarn-cluster 
> $FLINK_HOME/examples/streaming/SocketWindowWordCount.jar --port 8882之后提示The 
> number of requested virtual cores for application master 1 exceeds the 
> maximum number of virtual cores 0 available in the Yarn Cluster.
> 
> 
> 
> 
> 这是什么问题呢?



The number of requested virtual cores for application master 1 exceeds the maximum number of virtual cores 0 available in the Yarn Cluster.

2022-03-25 Thread Kaiqiang XIE谢开强
各位好,再请教一个问题哈
mac上部署flink standalone,在运行 bin/flink run -m yarn-cluster 
$FLINK_HOME/examples/streaming/SocketWindowWordCount.jar --port 8882之后提示The 
number of requested virtual cores for application master 1 exceeds the maximum 
number of virtual cores 0 available in the Yarn Cluster.




这是什么问题呢?

Re:Re: Flink本地集群监控问题

2022-03-25 Thread Kaiqiang XIE谢开强
 @charley.xu2...@gmail.com 


发现日志中报错org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter [] - 
Failed to push metrics to PushGateway with jobName 
f0181efe9a70542d02dd5848b4f260c5, groupingKey {k1=v1, 
k2=v2}.java.net.ConnectException: unknown host但是其实我在hosts文件里已经配了的。


把conf文件改为如下 就可以了。


多谢各位大佬指导!!


在 2022-03-26 01:20:59,"Ning Xu"  写道:
>可以在 job manager log 那里先定位下 prometheus reporter configuration
>在提交任务初始化的时候是否正确注册
>
>On Fri, Mar 25, 2022 at 23:42 Kaiqiang  wrote:
>
>>
>> https://stackoverflow.com/questions/71617626/flink-standalone-get-no-metrics-to-pushgateway-for-grafana
>>
>>
>> 哪位大佬知道这是咋回事么?万分感谢
>>
>> 发自我的iPhone


退订

2022-03-25 Thread 杨自闯
退订

Re: flink-kubernetes-operator: Flink deployment stuck in scheduled state when increasing resource CPU above 1

2022-03-25 Thread Őrhidi Mátyás
It's worth checking the deployment->replica set->pod chain for error
message.

On Fri, Mar 25, 2022, 19:49 Makas Tzavellas 
wrote:

> Hi,
>
> I have been experimenting with flink-kubernetes-operator today and it's
> very cool. However, the Flink application will be stuck in scheduled state
> if I increase the CPU to anything above 1 for JobManager and TaskManager.
> It works fine if I keep the CPU to 1.
>
> I am testing the Flink deployment with Minikube running with 4 CPUs and
> 8GB RAM.
>
> Unfortunately, I am not sure what to look out for to figure out why it's
> stuck.
>
> Appreciate it if someone could help point me in the right direction.
>
> Thanks!
>


flink-kubernetes-operator: Flink deployment stuck in scheduled state when increasing resource CPU above 1

2022-03-25 Thread Makas Tzavellas
Hi,

I have been experimenting with flink-kubernetes-operator today and it's
very cool. However, the Flink application will be stuck in scheduled state
if I increase the CPU to anything above 1 for JobManager and TaskManager.
It works fine if I keep the CPU to 1.

I am testing the Flink deployment with Minikube running with 4 CPUs and 8GB
RAM.

Unfortunately, I am not sure what to look out for to figure out why it's
stuck.

Appreciate it if someone could help point me in the right direction.

Thanks!


Re: Flink本地集群监控问题

2022-03-25 Thread Ning Xu
可以在 job manager log 那里先定位下 prometheus reporter configuration
在提交任务初始化的时候是否正确注册

On Fri, Mar 25, 2022 at 23:42 Kaiqiang  wrote:

>
> https://stackoverflow.com/questions/71617626/flink-standalone-get-no-metrics-to-pushgateway-for-grafana
>
>
> 哪位大佬知道这是咋回事么?万分感谢
>
> 发自我的iPhone


Flink本地集群监控问题

2022-03-25 Thread Kaiqiang
https://stackoverflow.com/questions/71617626/flink-standalone-get-no-metrics-to-pushgateway-for-grafana


哪位大佬知道这是咋回事么?万分感谢

发自我的iPhone

Re: DBT-flink profile?

2022-03-25 Thread Georg Heiler
Hi,

use is perhaps not the right word (yet) rather experiment. But both would
be relevant. And in particular, also the streaming option.

I also just found: https://materialize.com/docs/guides/dbt/ outlining how
dbt and streaming could potentially be married. Perhaps their integration
could serve as an example?

Best,
Georg

Am Fr., 25. März 2022 um 05:01 Uhr schrieb Yun Gao :

> Hi Georg,
>
> May I have a double confirmation for integrating with dbt,
> are you currenty want to use it for batch jobs or streaming jobs?
>
> Best,
> Yun Gao
>
>
>
> --
> Sender:Georg Heiler
> Date:2022/03/25 01:27:26
> Recipient:user
> Theme:DBT-flink profile?
>
> Hi,
>
> is anyone working on a DBT Flink plugin/profile?
>
> https://docs.getdbt.com/reference/profiles.yml hosts many other databases
> - and I think this kind of support would be really beneficial for the SQL
> part of Flink.
>
> Best,
> Georg
>
>


"Native Kubernetes" sample in Flink documentation fails. JobManager Web Interface is wrongly generated. [Flink 1.14.4]

2022-03-25 Thread Burcu Gul POLAT EGRI
I am getting the following error when I try to execute sample at Flink 
documentation - Native 
Kubernetes.

I have succedded to execute the first command in documentation by adding some 
extra parameters with the help of this 
post.

user@local:~/flink-1.14.4$ ./bin/kubernetes-session.sh \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dtaskmanager.memory.process.size=4096m \

-Dkubernetes.taskmanager.cpu=2 \

-Dtaskmanager.numberOfTaskSlots=4 \

-Dresourcemanager.taskmanager-timeout=360 \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config \

-Dkubernetes.jobmanager.service-account=flink-service-account

After executing above command, I have listed the new pod like below.

user@local:~/flink-1.14.4$ kubectl get pods

NAME READY   STATUSRESTARTS   
AGE

dproc-example-flink-cluster-id-68c79bf67-mwh52   1/1 Running   0  1m

Then, I have executed the below command to submit example job.

user@local:~/flink-1.14.4$ ./bin/flink run --target kubernetes-session \

-Dkubernetes.service-account=flink-service-account \

-Dkubernetes.cluster-id=dproc-example-flink-cluster-id \

-Dkubernetes.namespace=sdt-dproc-flink-test \

-Dkubernetes.config.file=/home/devuser/.kube/config

examples/batch/WordCount.jar --input /home/user/sometexts.txt --output 
/tmp/flinksample

After a while, I received below logs:

2022-03-25 12:38:00,538 INFO  
org.apache.flink.kubernetes.KubernetesClusterDescriptor  [] - Retrieve 
flink cluster dproc-example-flink-cluster-id successfully, JobManager Web 
Interface: http://10.150.140.248:8081





 The program finished with the following exception:



org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at 
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.

at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)

at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)

at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:131)

at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:70)

at 
org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:93)

at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)

at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)

... 8 more

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
JobGraph.

at 
java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395)

at 
java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999)

at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)

... 16 more

Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.

at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$11(RestClusterClient.java:433)

at 
java.base/java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986)

at 

退订

2022-03-25 Thread Peter Gee
退订


Re: Flink Web ui not stable in kubernetes?

2022-03-25 Thread Guillaume Vauvert

Hi,

I agree that the changelog description of 
https://issues.apache.org/jira/browse/FLINK-25732 is talking only about 
technical root cause, not user consequences.

I have added a comment in https://issues.apache.org/jira/browse/FLINK-25732.

Have a nice day !

Guillaume

On 25/03/2022 10.31, Sebastian Struss wrote:

Hi Guillaume,

thank you for this great hint! It indeed fixed the mentioned issue.
Just from reading the changelog of 1.14.4 i would not have known that 
this fix is included, maybe i was searching for the wrong stuff though.


Have a great day!
Sebastian

On Fri, Mar 25, 2022 at 10:51 AM Guillaume Vauvert 
 wrote:


Hello Sebastian,

Multiple versions of Flink 1.14.x are known to have issue with
UI/CLI, please switch to Flink 1.14.4.

Best regards,

Guillaume

On 25/03/2022 08.42, Sebastian Struss wrote:

Hello all,

i've been setting up flink in my kubernetes cluster with 2 job
managers and 1 task manager (custom helm chart i wrote, no
flink CLI used).
I can access the web ui, but often it seems to switch pods which
i am connected to and as soon as i am connected to the standby
job manager it doesn't load at all.
The leader election does seem to work nicely, as when i kill the
leading pod the standby instance takes over after ~5s.
I do see errors like this when i browse the web ui:

"""
2022-03-24 15:38:17,269 ERROR
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] -
Unhandled exception.
java.util.concurrent.CancellationException: null
at
java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
~[?:1.8.0_302]
at

org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_302]
at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at


Re: Flink Web ui not stable in kubernetes?

2022-03-25 Thread Sebastian Struss
Hi Guillaume,

thank you for this great hint! It indeed fixed the mentioned issue.
Just from reading the changelog of 1.14.4 i would not have known that this
fix is included, maybe i was searching for the wrong stuff though.

Have a great day!
Sebastian

On Fri, Mar 25, 2022 at 10:51 AM Guillaume Vauvert <
guillaume.vauvert@gmail.com> wrote:

> Hello Sebastian,
>
> Multiple versions of Flink 1.14.x are known to have issue with UI/CLI,
> please switch to Flink 1.14.4.
>
> Best regards,
>
> Guillaume
> On 25/03/2022 08.42, Sebastian Struss wrote:
>
> Hello all,
>
> i've been setting up flink in my kubernetes cluster with 2 job managers
> and 1 task manager (custom helm chart i wrote, no flink CLI used).
> I can access the web ui, but often it seems to switch pods which i am
> connected to and as soon as i am connected to the standby job manager it
> doesn't load at all.
> The leader election does seem to work nicely, as when i kill the leading
> pod the standby instance takes over after ~5s.
> I do see errors like this when i browse the web ui:
>
> """
> 2022-03-24 15:38:17,269 ERROR 
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler
>  [] - Unhandled exception.
> java.util.concurrent.CancellationException: null
> at
> java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
> ~[?:1.8.0_302]
> at 
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
> ~[flink-dist_2.12-1.14.2.jar:1.14.2]
> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_302]
> at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> [flink-dist_2.12-1.14.2.jar:1.14.2]
> at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
> 

Re: Flink Web ui not stable in kubernetes?

2022-03-25 Thread Guillaume Vauvert

Hello Sebastian,

Multiple versions of Flink 1.14.x are known to have issue with UI/CLI, 
please switch to Flink 1.14.4.


Best regards,

Guillaume

On 25/03/2022 08.42, Sebastian Struss wrote:

Hello all,

i've been setting up flink in my kubernetes cluster with 2 job 
managers and 1 task manager (custom helm chart i wrote, no flink CLI 
used).
I can access the web ui, but often it seems to switch pods which i am 
connected to and as soon as i am connected to the standby job manager 
it doesn't load at all.
The leader election does seem to work nicely, as when i kill the 
leading pod the standby instance takes over after ~5s.

I do see errors like this when i browse the web ui:

"""
2022-03-24 15:38:17,269 ERROR 
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  [] - 
Unhandled exception.

java.util.concurrent.CancellationException: null
at 
java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276) 
~[?:1.8.0_302]
at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83) 
~[flink-dist_2.12-1.14.2.jar:1.14.2]

at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_302]
at 
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) 
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:238) 

退订

2022-03-25 Thread guoxb__...@sina.com

退订


guoxb__...@sina.com


Re: Where can I find flink-kubernetes-operator project?

2022-03-25 Thread Makas Tzavellas
Thank you!

On Fri, Mar 25, 2022 at 4:55 PM Gyula Fóra  wrote:

> Hi Makas!
>
> The repo is here: https://github.com/apache/flink-kubernetes-operator
>
> We should add a link somewhere in the docs :)
>
> Cheers,
> Gyula
>
>
> On Fri, Mar 25, 2022 at 9:45 AM Makas Tzavellas 
> wrote:
>
>> Hello,
>>
>> I came across this document
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
>> and would like to give it a try. But I have not been able to find it
>> anywhere in Flink's main repository.
>>
>> Could someone please point me in the right direction?
>>
>> Thanks,
>>
>> Makas
>>
>


Re: Where can I find flink-kubernetes-operator project?

2022-03-25 Thread Gyula Fóra
Hi Makas!

The repo is here: https://github.com/apache/flink-kubernetes-operator

We should add a link somewhere in the docs :)

Cheers,
Gyula


On Fri, Mar 25, 2022 at 9:45 AM Makas Tzavellas 
wrote:

> Hello,
>
> I came across this document
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
> and would like to give it a try. But I have not been able to find it
> anywhere in Flink's main repository.
>
> Could someone please point me in the right direction?
>
> Thanks,
>
> Makas
>


flink cluster startup time

2022-03-25 Thread Frank Dekervel

Hello,

We run flink using the spotify flink Kubernetes operator (job cluster 
mode). Everything works fine, including upgrades and crash recovery. We 
do not run the job manager in HA mode.


One of the problems we have is that upon upgrades (or during testing), 
the startup time of the flink cluster takes a very long time:


 * First the operator needs to create the cluster (JM+TM), and wait for
   it to respond for api requests. This already takes a couple of minutes.
 * Then the operator creates a job-submitter pod that submits the job
   to the cluster. The job is packaged as a fat jar, but it is already
   baked in the docker images we use (so technically there would be no
   need to "submit" it from a separate pod). The submission goes rather
   fast tho (the time between the job submitter seeing the cluster is
   online and the "hello" log from the main program is <1min)
 * Then the application needs to start up and load its state from the
   latest savepoint, which again takes a couple of minutes

All steps take quite some time, and we are looking to reduce the startup 
time to allow for easier testing but also less downtime during upgrades. 
So i have some questions:


 * I wonder if the situation is the same for all kubernetes operators. 
   I really need some kind of operator because i otherwise i have to
   set which savepoint to load from myself every startup.
 * What cluster startup time is considered to be acceptable / best
   practise ?
 * If there are other tricks to reduce startup time, i would be very
   interested in knowing them :-)

There is also a discussion ongoing on running flink on spot nodes. I 
guess the startup time is relevant there too.


Thanks already
Frank






Where can I find flink-kubernetes-operator project?

2022-03-25 Thread Makas Tzavellas
Hello,

I came across this document
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
and would like to give it a try. But I have not been able to find it
anywhere in Flink's main repository.

Could someone please point me in the right direction?

Thanks,

Makas


Flink Web ui not stable in kubernetes?

2022-03-25 Thread Sebastian Struss
Hello all,

i've been setting up flink in my kubernetes cluster with 2 job managers and
1 task manager (custom helm chart i wrote, no flink CLI used).
I can access the web ui, but often it seems to switch pods which i am
connected to and as soon as i am connected to the standby job manager it
doesn't load at all.
The leader election does seem to work nicely, as when i kill the leading
pod the standby instance takes over after ~5s.
I do see errors like this when i browse the web ui:

"""
2022-03-24 15:38:17,269 ERROR
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler
 [] - Unhandled exception.
java.util.concurrent.CancellationException: null
at
java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
~[?:1.8.0_302]
at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_302]
at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:238)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:71)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at 

Re: Flink kafka consumer disconnection, application processing stays behind

2022-03-25 Thread Qingsheng Ren
Hi Isidoros, 

Your watermark strategy looks fine to me. I’m not quite sure if it is related. 

Best regards, 

Qingsheng

> On Mar 24, 2022, at 21:11, Isidoros Ioannou  wrote:
> 
> Hi Qingsheng,
> 
> thank you a lot for you response.
> The message I see from the consumer before the log exception I provided 
> previously is this:
> "locationInformation": 
> "org.apache.kafka.clients.NetworkClient.handleTimedOutRequests(NetworkClient.java:778)",
> "logger": "org.apache.kafka.clients.NetworkClient",
> "message": "[Consumer 
> clientId=consumer-realtime-analytics-eu-production-node2-2, 
> groupId=realtime-analytics-eu-production-node2] Disconnecting from node 3 due 
> to request timeout."
> 
> I saw it in debug mode and thats the reason I increased the 
> "request.timeout.ms". 
> I will follow your advice and investigate the broker logs once the event 
> occurs again.
> 
> Regarding the backpressure. the 10 cep operators we have, use some iterative 
> conditions that add some burden and in periods of high load the operators are 
> getting red in flink ui so these add the backpressure.
> However, in mediocre load the operators are performing fine, except when we 
> have disconnections. It seems that after the disconnection the watermarks are 
> not emmited quickly causing the operators not to release the
> data to sinks. I don't know actually if I have helped, but is there any 
> chance that it would be a problem of how we have configured the watermarks?
> 
> Στις Πέμ 24 Μαρ 2022 στις 10:27 π.μ., ο/η Qingsheng Ren  
> έγραψε:
> Hi Isidoros,
> 
> I’m not sure in which kind of way the timeout and the high back pressure are 
> related, but I think we can try to resolve the request timeout issue first. 
> You can take a look at the request log on Kafka broker and see if the request 
> was received by broker, and how long it takes for broker to handle it. By 
> default the request log is on WARN level, and you may want to increase it to 
> DEBUG or TRACE to reveal more information. 
> 
> Another thought in my mind is about the content of the record, since you 
> mentioned extremely high back pressure after the disconnection issue. If some 
> messages are quite large or complex, it might block the network or require 
> more resources to make the serde, even burden some operator in the pipeline 
> and finally lead to back pressure. Once the back pressure happens in the 
> pipeline, you can try to locate the operator causing the back pressure and 
> make some analysis to see why the throughput drops, or dump the record to see 
> if there’s something special in it. 
> 
> Hope these could be helpful! 
> 
> Best regards, 
> 
> Qingsheng
> 
> > On Mar 23, 2022, at 19:19, Isidoros Ioannou  wrote:
> > 
> > Hi, we are running flink 1.13.2 version on Kinesis Analytics. Our source is 
> > a kafka topic with one partition so far and we are using the 
> > FlinkKafkaConsumer (kafka-connector-1.13.2) 
> > Sometimes we get some errors from the consumer like the below:
> > 
> > "locationInformation":"org.apache.kafka.clients.FetchSessionHandler.handleError(FetchSessionHandler.java:445)",
> > "logger": "org.apache.kafka.clients.FetchSessionHandler",
> > "message": "[Consumer 
> > clientId=consumer-realtime-analytics-eu-production-node2-2, 
> > groupId=realtime-analytics-eu-production-node2] Error sending fetch request 
> > (sessionId=1343463307, epoch=172059) to node 3: 
> > org.apache.kafka.common.errors.DisconnectException.",
> > "threadName": "Kafka Fetcher for Source: Kafka -> Map -> Filter -> Map 
> > -> Filter -> Timestamps/Watermarks -> Filter (1/1)#0",
> > 
> > With the debug logging it appeared that this happens due to request timeout 
> > so I have increased the request.timeout.ms to 6 , however it did not 
> > resolve the issue. Even if I get the disconnection I can see that after 1s 
> > the consumer sends a successful fetchRequest.
> > 
> > The problem we have noticed is that after the disconnection the application 
> > stays behind from processing. the backpressure on the source gets 100% and 
> > the app consumes events at a lower rate even if we do not have much traffic 
> > to cope with. 
> > 
> > We use eventTime and the watermarks are not generated in the consumer since 
> > we have one partition. the source is the following
> > 
> > DataStream stream =
> > 
> > env.addSource(consumerBase).name("Kafka").uid("Kafka").filter(f -> 
> > !f.getServerId().equals("Demo150")).keyBy(ServerAwareJournal::getServerId);
> > 
> > and then we assign the following watermark: 
> > 
> > WatermarkStrategy. > >forBoundedOutOfOrderness(Duration.ofSeconds(3))
> > .withTimestampAssigner((element, recordTimestamp) -> 
> > element.getMessage().getDateTime().atZone(journalTimezone).toInstant()
> > .toEpochMilli()).withIdleness(Duration.ofMinutes(1));
> > 
> > the upstream operators are 10 cep operators with a parallelism of 15 and 
> > then there is a union of the data emitted 

Re: Using Amazon EC2 Spot instances with Flink

2022-03-25 Thread Yuval Itzchakov
My company is running Flink in Kubernetes with spot instances for both JM
and TM.

Feel free to reach out.

On Thu, Mar 24, 2022, 20:33 Ber, Jeremy  wrote:

>
> https://aws.amazon.com/blogs/compute/optimizing-apache-flink-on-amazon-eks-using-amazon-ec2-spot-instances/
>
>
>
> Sharing this link FWIW.
>
>
>
> Jeremy
>
>
>
> *From: *David Anderson 
> *Date: *Thursday, March 24, 2022 at 1:32 PM
> *To: *"Vasileva, Valeriia" 
> *Cc: *"user@flink.apache.org" 
> *Subject: *RE: [EXTERNAL] Using Amazon EC2 Spot instances with Flink
>
>
>
> *CAUTION*: This email originated from outside of the organization. Do not
> click links or open attachments unless you can confirm the sender and know
> the content is safe.
>
>
>
> I remember a Flink Forward talk several years ago where the speaker shared
> how they were running on spot instances. They were catching the
> notification that the instance was being shutdown, taking a savepoint, and
> relaunching. They were also proactively monitoring spot instance prices
> around the world, and migrating to the least expensive region.
>
>
>
> David
>
>
>
> On Tue, Mar 22, 2022 at 11:33 PM Vasileva, Valeriia <
> c-valeriia.vasil...@disneystreaming.com> wrote:
>
> Hello, folks!
>
>
>
> I was wondering if there are some good articles on how to use EC2 Spot
> instances with Flink?
>
>
>
> I would appreciate your help! Thank you!
>
>
>
> Kind Regards,
>
> Valeriia
>
>


退订

2022-03-25 Thread hihl