Re: FlinkJobNotFoundException

2021-09-29 Thread Matthias Pohl
I didn't receive any email. But we rather not do individual support. Please
share the logs on the mailing list. This way, anyone is able to participate
in the discussion.

Best,
Matthias

On Wed, Sep 29, 2021 at 8:12 PM Gusick, Doug S  wrote:

> Hi Matthias,
>
>
>
> Thank you for getting back. We have been looking into upgrading to a newer
> version, but have not completed full testing just yet.
>
>
>
> I was unable to find a previous error in the JM logs. You should have
> received an email with details to a “lockbox”. I have uploaded the job
> manager logs there. Please let me know if you need any more information.
>
>
>
> Thank you,
>
> Doug
>
>
>
> *From:* Matthias Pohl 
> *Sent:* Wednesday, September 29, 2021 12:00 PM
> *To:* Gusick, Doug S [Engineering] 
> *Cc:* user@flink.apache.org; Erai, Rahul [Engineering] <
> rahul.e...@ny.email.gs.com>
> *Subject:* Re: FlinkJobNotFoundException
>
>
>
> Hi Doug,
>
> thanks for reaching out to the community. First of all, 1.9.2 is quite an
> old Flink version. You might want to consider upgrading to a newer version.
> The community only offers support for the two most-recent Flink versions.
> Newer version might include fixes for your issue.
>
>
>
> But back to your actual problem: The logs you're providing only show that
> some job switched into FINISHED state. Is there some error showing up
> earlier in the logs which you might have missed? It would be helpful if you
> could share the complete JobManager logs to get a better understanding of
> what's going on.
>
>
>
> Best,
> Matthias
>
>
>
> On Wed, Sep 29, 2021 at 3:47 PM Gusick, Doug S  wrote:
>
> Hello,
>
>
>
> We are facing an issue with some of our applications that are submitting a
> high volume of jobs to Flink (we are using v1.9.2). We are observing that
> numerous jobs (in this case 44 out of 350+) fail with the same
> FlinkJobNotFoundException within a 45 second timeframe.
>
>
>
> From our client logs, this is the exception we can see:
>
>
>
> Calc Engine: Caused by: 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (d0991f0ae712a9df710aa03311a32c8c)]
>
> Calc Engine:   at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>
> Calc Engine:   at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
> Calc Engine:   ... 3 more
>
>
>
>
>
> This is the first job to fail with the above exception. From the
> JobManager logs, we can see that the job goes to FINISHED State, and then
> we see the following exception:
>
>
>
> 2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink
> Java Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c)
> switched from state RUNNING to FINISHED.
>
> 2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
> d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
>
> 2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.jobmaster.JobMaster  - Stopping
> the JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT
> 2021(d0991f0ae712a9df710aa03311a32c8c).
>
> 2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39]
> org.apache.flink.yarn.YarnResourceManager - Disconnect
> job manager
> 0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
> for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
>
> 2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91]
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  -
> Exception occurred in REST handler:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job (d0991f0ae712a9df710aa03311a32c8c)
>
>
>
> Here are the relevant logs from the TaskManager. We can see that the 
> JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
> trying to reconnect?:
>
>
> 2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
> request b26c04706fd5aad03dfdca8691f1bf1c for job 
> d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
> .
>
> 2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
> d

Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Konstantin Knauf
Big Thanks to David, Joe, Xintong and everyone who contributed to the
release! Well done!

Cheers,

Konstantin

On Thu, Sep 30, 2021 at 3:12 AM Yangze Guo  wrote:

> Thanks, Xintong, Joe, Dawid for the great work, thanks to everyone
> involved!
>
> Best,
> Yangze Guo
>
> On Thu, Sep 30, 2021 at 12:02 AM Rion Williams 
> wrote:
> >
> > Great news all! Looking forward to it!
> >
> > > On Sep 29, 2021, at 10:43 AM, Theo Diefenthal <
> theo.diefent...@scoop-software.de> wrote:
> > >
> > > 
> > > Awesome, thanks for the release.
> > >
> > > - Ursprüngliche Mail -
> > > Von: "Dawid Wysakowicz" 
> > > An: "dev" , "user" ,
> annou...@apache.org
> > > Gesendet: Mittwoch, 29. September 2021 15:59:47
> > > Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> > >
> > > The Apache Flink community is very happy to announce the release of
> > > Apache Flink 1.14.0.
> > >
> > > Apache Flink® is an open-source stream processing framework for
> > > distributed, high-performing, always-available, and accurate data
> > > streaming applications.
> > >
> > > The release is available for download at:
> > > https://flink.apache.org/downloads.html
> > >
> > > Please check out the release blog post for an overview of the
> > > improvements for this bugfix release:
> > > https://flink.apache.org/news/2021/09/29/release-1.14.0.html
> > >
> > > The full release notes are available in Jira:
> > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349614
> > >
> > > We would like to thank all contributors of the Apache Flink community
> > > who made this release possible!
> > >
> > > Regards,
> > > Xintong, Joe, Dawid
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Yangze Guo
Thanks, Xintong, Joe, Dawid for the great work, thanks to everyone involved!

Best,
Yangze Guo

On Thu, Sep 30, 2021 at 12:02 AM Rion Williams  wrote:
>
> Great news all! Looking forward to it!
>
> > On Sep 29, 2021, at 10:43 AM, Theo Diefenthal 
> >  wrote:
> >
> > 
> > Awesome, thanks for the release.
> >
> > - Ursprüngliche Mail -
> > Von: "Dawid Wysakowicz" 
> > An: "dev" , "user" , 
> > annou...@apache.org
> > Gesendet: Mittwoch, 29. September 2021 15:59:47
> > Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> >
> > The Apache Flink community is very happy to announce the release of
> > Apache Flink 1.14.0.
> >
> > Apache Flink® is an open-source stream processing framework for
> > distributed, high-performing, always-available, and accurate data
> > streaming applications.
> >
> > The release is available for download at:
> > https://flink.apache.org/downloads.html
> >
> > Please check out the release blog post for an overview of the
> > improvements for this bugfix release:
> > https://flink.apache.org/news/2021/09/29/release-1.14.0.html
> >
> > The full release notes are available in Jira:
> > https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349614
> >
> > We would like to thank all contributors of the Apache Flink community
> > who made this release possible!
> >
> > Regards,
> > Xintong, Joe, Dawid


Flink application mode with no ui , how to start job using k8s ?

2021-09-29 Thread Dhiru
Hi ,
   My requirement is to create Flink cluster application Mode on k8s and do not 
want to expose UI, my requirement is to start the long-running  job which can 
be instantiated at boot time of flink and keep running
use these resource files from jobmanager-application-ha.yaml and 
taskmanager-job-deployment.yaml for creating cluster 
(https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#application-cluster-resource-definitions)

a)  I need to start job during run time , I can bundle my jar with Flink image, 
so that can  instantiate jar 
b) Can I apply HPA (horizontal pod autoscalar) for task manager, will this work 
so that according to workload instance of  taskmanager goes up and down.
--kumar




Re: Flink run different jars

2021-09-29 Thread Qihua Yang
Thanks a lot Yangze. That is very helpful!

On Tue, Sep 28, 2021 at 11:11 PM Yangze Guo  wrote:

> You need to edit the conf/workers. Example of the config[1] and the
> process[2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode
>
> Best,
> Yangze Guo
>
> On Wed, Sep 29, 2021 at 1:02 PM Qihua Yang  wrote:
> >
> > Hi Yangze,
> >
> > Thanks a lot for your reply. References are very helpful!
> > Another quick question. Reference 1 can start a standalone cluster
> (session Mode). That cluster has a jobManager. I can submit job to run. How
> about taskManger? Do I need to manually start multiple taskManagers?
> > Is there a complete example to show the process?
> >
> > Thanks,
> > Qihua
> >
> >
> > On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo  wrote:
> >>
> >> Hi, Qihua
> >>
> >> IIUC, what you want might be a standalone cluster[1] or session
> cluster[2][3].
> >>
> >> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/
> >> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode
> >> [3]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode
> >>
> >> Best,
> >> Yangze Guo
> >>
> >> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang  wrote:
> >> >
> >> > Hi,
> >> >
> >> > Is that possible to run a flink app without a job? What I am trying
> to do is I build multiple jars. And switch jar to run different jobs.
> >> > I am not sure if flink supports this mode. I saw rest API can upload
> jar, cancel job and run a jar.
> >> > Right now I can upload a jar to flink. But when I cancel a job, flink
> will restart automatically. I checked log. It show below logs. Can anyone
> help me out?
> >> >
> >> > Caused by:
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException:
> Application Status: CANCELED
> >> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73)
> >> > ... 41 common frames omitted
> >> > Caused by: org.apache.flink.runtime.client.JobCancellationException:
> Job was cancelled.
> >> > at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149)
> >> > at
> org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64)
> >> > ... 41 common frames omitted
> >> >
> >> > Thanks!
>


Start Flink cluster, k8s pod behavior

2021-09-29 Thread Qihua Yang
Hi,
I deployed flink in session mode. I didn't run any jobs. I saw below logs.
That is normal, same as Flink menual shows.

+ /opt/flink/bin/run-job-manager.sh
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g.

But when I check kubectl, it shows status is Completed. After a while,
status changed to CrashLoopBackOff, and pod restart.
NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 Completed  5
 5m27s

NAME  READY
STATUS RESTARTS   AGE
job-manager-776dcf6dd-xzs8g   0/1 CrashLoopBackOff   5
 7m35s

Anyone can help me understand why?
Why do kubernetes regard this pod as completed and restart? Should I config
something? either Flink side or Kubernetes side? From the Flink manual,
after the cluster is started, I can upload a jar to run the application.

Thanks,
Qihua


RE: FlinkJobNotFoundException

2021-09-29 Thread Gusick, Doug S
Hi Matthias,

Thank you for getting back. We have been looking into upgrading to a newer 
version, but have not completed full testing just yet.

I was unable to find a previous error in the JM logs. You should have received 
an email with details to a “lockbox”. I have uploaded the job manager logs 
there. Please let me know if you need any more information.

Thank you,
Doug

From: Matthias Pohl 
Sent: Wednesday, September 29, 2021 12:00 PM
To: Gusick, Doug S [Engineering] 
Cc: user@flink.apache.org; Erai, Rahul [Engineering] 

Subject: Re: FlinkJobNotFoundException

Hi Doug,
thanks for reaching out to the community. First of all, 1.9.2 is quite an old 
Flink version. You might want to consider upgrading to a newer version. The 
community only offers support for the two most-recent Flink versions. Newer 
version might include fixes for your issue.

But back to your actual problem: The logs you're providing only show that some 
job switched into FINISHED state. Is there some error showing up earlier in the 
logs which you might have missed? It would be helpful if you could share the 
complete JobManager logs to get a better understanding of what's going on.

Best,
Matthias

On Wed, Sep 29, 2021 at 3:47 PM Gusick, Doug S 
mailto:doug.gus...@gs.com>> wrote:
Hello,

We are facing an issue with some of our applications that are submitting a high 
volume of jobs to Flink (we are using v1.9.2). We are observing that numerous 
jobs (in this case 44 out of 350+) fail with the same FlinkJobNotFoundException 
within a 45 second timeframe.

From our client logs, this is the exception we can see:


Calc Engine: Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)]

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

Calc Engine:   at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

Calc Engine:   ... 3 more


This is the first job to fail with the above exception. From the JobManager 
logs, we can see that the job goes to FINISHED State, and then we see the 
following exception:

2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c) switched 
from state RUNNING to FINISHED.
2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the 
JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT 
2021(d0991f0ae712a9df710aa03311a32c8c).
2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39] 
org.apache.flink.yarn.YarnResourceManager - Disconnect job 
manager 
0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
 for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
Exception occurred in REST handler: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)


Here are the relevant logs from the TaskManager. We can see that the 
JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
trying to reconnect?:

2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request b26c04706fd5aad03dfdca8691f1bf1c for job 
d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
.

2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
d0991f0ae712a9df710aa03311a32c8c for job leader monitoring.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Successful 
registration at job manager 
akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392 for job 
d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,397 INFO  [flink

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
...and if possible, it would be helpful to provide debug logs as well.

On Wed, Sep 29, 2021 at 6:33 PM Matthias Pohl 
wrote:

> May you provide the entire JobManager logs so that we can see what's going
> on?
>
> On Wed, Sep 29, 2021 at 12:42 PM Javier Vegas  wrote:
>
>> Thanks again, Matthias!
>>
>> Putting  -Djobmanager.rpc.address=$HOST and  -Djobmanager.rpc.port=$PORT0
>> as params for appmaster.sh
>> I see in tog they seem to transform in the correct values
>>
>> -Djobmanager.rpc.address=10.0.23.35 -Djobmanager.rpc.port=31009
>>
>> but a bit later the appmaster dies with this new error. it is unclear
>> what address it is trying to bind, I added explicit params
>> -Drest.bind-port=8081 and
>>   -Drest.port=8081 in case jobmanager.rpc.port was somehow
>> interfering, but that didn't help.
>>
>> 2021-09-29 10:29:59.845 [main] INFO  
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting 
>> MesosSessionClusterEntrypoint down with application status FAILED. 
>> Diagnostics java.net.BindException: Cannot assign requested address
>>  at java.base/sun.nio.ch.Net.bind0(Native Method)
>>  at java.base/sun.nio.ch.Net.bind(Unknown Source)
>>  at java.base/sun.nio.ch.Net.bind(Unknown Source)
>>  at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:550)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:248)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>>  at 
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>>  at java.base/java.lang.Thread.run(Unknown Source)
>>
>> On Wed, Sep 29, 2021 at 2:36 AM Matthias Pohl 
>> wrote:
>>
>>> The port has its separate configuration parameter jobmanager.rpc.port [1]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1
>>>
>>> On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:
>>>
 Matthias, thanks for the suggestion! I changed my
 jobmanager.rpc.address param from $HOSTNAME to $HOST:$PORT0 which in the
 log I see resolves properly to the host IP and port mapped to 8081

 2021-09-29 07:58:05.452 [main] INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
 -Djobmanager.rpc.address=10.0.22.114:31894

 which is very promising. But sadly a little bit later appmaster dies
 with this errror:

 2021-09-29 07:58:05.648 [main] INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
 cluster services.
 2021-09-29 07:58:05.674 [main] INFO
  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
 MesosSessionClusterEntrypoint down with application status FAILED.
 Diagnostics org.apache.flink.configurati
 on.IllegalConfigurationException: The configured hostname is not valid
 at
 org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
 at
 org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
 at
 org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
 at
 org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
 at
 org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
May you provide the entire JobManager logs so that we can see what's going
on?

On Wed, Sep 29, 2021 at 12:42 PM Javier Vegas  wrote:

> Thanks again, Matthias!
>
> Putting  -Djobmanager.rpc.address=$HOST and  -Djobmanager.rpc.port=$PORT0
> as params for appmaster.sh
> I see in tog they seem to transform in the correct values
>
> -Djobmanager.rpc.address=10.0.23.35 -Djobmanager.rpc.port=31009
>
> but a bit later the appmaster dies with this new error. it is unclear what
> address it is trying to bind, I added explicit params
> -Drest.bind-port=8081 and
>   -Drest.port=8081 in case jobmanager.rpc.port was somehow
> interfering, but that didn't help.
>
> 2021-09-29 10:29:59.845 [main] INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting 
> MesosSessionClusterEntrypoint down with application status FAILED. 
> Diagnostics java.net.BindException: Cannot assign requested address
>   at java.base/sun.nio.ch.Net.bind0(Native Method)
>   at java.base/sun.nio.ch.Net.bind(Unknown Source)
>   at java.base/sun.nio.ch.Net.bind(Unknown Source)
>   at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:550)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:248)
>   at 
> org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
>   at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>   at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
>   at java.base/java.lang.Thread.run(Unknown Source)
>
> On Wed, Sep 29, 2021 at 2:36 AM Matthias Pohl 
> wrote:
>
>> The port has its separate configuration parameter jobmanager.rpc.port [1]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1
>>
>> On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:
>>
>>> Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
>>> param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
>>> properly to the host IP and port mapped to 8081
>>>
>>> 2021-09-29 07:58:05.452 [main] INFO
>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
>>> -Djobmanager.rpc.address=10.0.22.114:31894
>>>
>>> which is very promising. But sadly a little bit later appmaster dies
>>> with this errror:
>>>
>>> 2021-09-29 07:58:05.648 [main] INFO
>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
>>> cluster services.
>>> 2021-09-29 07:58:05.674 [main] INFO
>>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
>>> MesosSessionClusterEntrypoint down with application status FAILED.
>>> Diagnostics org.apache.flink.configurati
>>> on.IllegalConfigurationException: The configured hostname is not valid
>>> at
>>> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
>>> at
>>> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
>>> at
>>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
>>> at
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRp

Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Rion Williams
Great news all! Looking forward to it!

> On Sep 29, 2021, at 10:43 AM, Theo Diefenthal 
>  wrote:
> 
> 
> Awesome, thanks for the release.
> 
> - Ursprüngliche Mail -
> Von: "Dawid Wysakowicz" 
> An: "dev" , "user" , 
> annou...@apache.org
> Gesendet: Mittwoch, 29. September 2021 15:59:47
> Betreff: [ANNOUNCE] Apache Flink 1.14.0 released
> 
> The Apache Flink community is very happy to announce the release of
> Apache Flink 1.14.0.
>  
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data
> streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html
>  
> Please check out the release blog post for an overview of the
> improvements for this bugfix release:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349614
>  
> We would like to thank all contributors of the Apache Flink community
> who made this release possible!
>  
> Regards,
> Xintong, Joe, Dawid


Re: FlinkJobNotFoundException

2021-09-29 Thread Matthias Pohl
Hi Doug,
thanks for reaching out to the community. First of all, 1.9.2 is quite an
old Flink version. You might want to consider upgrading to a newer version.
The community only offers support for the two most-recent Flink versions.
Newer version might include fixes for your issue.

But back to your actual problem: The logs you're providing only show that
some job switched into FINISHED state. Is there some error showing up
earlier in the logs which you might have missed? It would be helpful if you
could share the complete JobManager logs to get a better understanding of
what's going on.

Best,
Matthias

On Wed, Sep 29, 2021 at 3:47 PM Gusick, Doug S  wrote:

> Hello,
>
>
>
> We are facing an issue with some of our applications that are submitting a
> high volume of jobs to Flink (we are using v1.9.2). We are observing that
> numerous jobs (in this case 44 out of 350+) fail with the same
> FlinkJobNotFoundException within a 45 second timeframe.
>
>
>
> From our client logs, this is the exception we can see:
>
>
>
> Calc Engine: Caused by: 
> org.apache.flink.runtime.rest.util.RestClientException: 
> [org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
> Flink job (d0991f0ae712a9df710aa03311a32c8c)]
>
> Calc Engine:   at 
> org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)
>
> Calc Engine:   at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>
> Calc Engine:   at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
> Calc Engine:   ... 3 more
>
>
>
>
>
> This is the first job to fail with the above exception. From the
> JobManager logs, we can see that the job goes to FINISHED State, and then
> we see the following exception:
>
>
>
> 2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink
> Java Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c)
> switched from state RUNNING to FINISHED.
>
> 2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
> d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
>
> 2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28]
> org.apache.flink.runtime.jobmaster.JobMaster  - Stopping
> the JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT
> 2021(d0991f0ae712a9df710aa03311a32c8c).
>
> 2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39]
> org.apache.flink.yarn.YarnResourceManager - Disconnect
> job manager
> 0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
> for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
>
> 2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91]
> org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  -
> Exception occurred in REST handler:
> org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find
> Flink job (d0991f0ae712a9df710aa03311a32c8c)
>
>
>
> Here are the relevant logs from the TaskManager. We can see that the 
> JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
> trying to reconnect?:
>
>
> 2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
> request b26c04706fd5aad03dfdca8691f1bf1c for job 
> d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
> .
>
> 2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
> d0991f0ae712a9df710aa03311a32c8c for job leader monitoring.
>
> 2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.JobLeaderService- Successful 
> registration at job manager 
> akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392 for job 
> d0991f0ae712a9df710aa03311a32c8c.
>
> 2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Establish 
> JobManager connection for job d0991f0ae712a9df710aa03311a32c8c.
>
> 2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Offer 
> reserved slots to the leader of job d0991f0ae712a9df710aa03311a32c8c.
>
> 2021-09-28 04:54:13,405 INFO  [CHAIN DataSource (settl_delivery_type_code | 
> DistCp | Sourcing Files) -> FlatMap (settl_delivery_type_code | DistCp | 
>

Re: [ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Theo Diefenthal


Awesome, thanks for the release.

- Ursprüngliche Mail -
Von: "Dawid Wysakowicz" 
An: "dev" , "user" , 
annou...@apache.org
Gesendet: Mittwoch, 29. September 2021 15:59:47
Betreff: [ANNOUNCE] Apache Flink 1.14.0 released

The Apache Flink community is very happy to announce the release of
Apache Flink 1.14.0.
 
Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.
 
The release is available for download at:
https://flink.apache.org/downloads.html
 
Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
 
The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349614
 
We would like to thank all contributors of the Apache Flink community
who made this release possible!
 
Regards,
Xintong, Joe, Dawid


[ANNOUNCE] Apache Flink 1.14.0 released

2021-09-29 Thread Dawid Wysakowicz
The Apache Flink community is very happy to announce the release of
Apache Flink 1.14.0.
 
Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data
streaming applications.
 
The release is available for download at:
https://flink.apache.org/downloads.html
 
Please check out the release blog post for an overview of the
improvements for this bugfix release:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html
 
The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12349614
 
We would like to thank all contributors of the Apache Flink community
who made this release possible!
 
Regards,
Xintong, Joe, Dawid




OpenPGP_signature
Description: OpenPGP digital signature


FlinkJobNotFoundException

2021-09-29 Thread Gusick, Doug S
Hello,

We are facing an issue with some of our applications that are submitting a high 
volume of jobs to Flink (we are using v1.9.2). We are observing that numerous 
jobs (in this case 44 out of 350+) fail with the same FlinkJobNotFoundException 
within a 45 second timeframe.

>From our client logs, this is the exception we can see:


Calc Engine: Caused by: org.apache.flink.runtime.rest.util.RestClientException: 
[org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)]

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:389)

Calc Engine:   at 
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:373)

Calc Engine:   at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)

Calc Engine:   at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

Calc Engine:   ... 3 more


This is the first job to fail with the above exception. From the JobManager 
logs, we can see that the job goes to FINISHED State, and then we see the 
following exception:

2021-09-28 04:54:16,936 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink Java 
Job at Tue Sep 28 04:48:21 EDT 2021 (d0991f0ae712a9df710aa03311a32c8c) switched 
from state RUNNING to FINISHED.
2021-09-28 04:54:16,937 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
d0991f0ae712a9df710aa03311a32c8c reached globally terminal state FINISHED.
2021-09-28 04:54:16,939 INFO  [flink-akka.actor.default-dispatcher-28] 
org.apache.flink.runtime.jobmaster.JobMaster  - Stopping the 
JobMaster for job Flink Java Job at Tue Sep 28 04:48:21 EDT 
2021(d0991f0ae712a9df710aa03311a32c8c).
2021-09-28 04:54:16,940 INFO  [flink-akka.actor.default-dispatcher-39] 
org.apache.flink.yarn.YarnResourceManager - Disconnect job 
manager 
0...@akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392
 for job d0991f0ae712a9df710aa03311a32c8c from the resource manager.
2021-09-28 04:54:18,256 ERROR [flink-akka.actor.default-dispatcher-91] 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler  - 
Exception occurred in REST handler: 
org.apache.flink.runtime.messages.FlinkJobNotFoundException: Could not find 
Flink job (d0991f0ae712a9df710aa03311a32c8c)


Here are the relevant logs from the TaskManager. We can see that the 
JobLeaderService tries to reconnect to the job. Any ideas as to why it is 
trying to reconnect?:

2021-09-28 04:54:13,382 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request b26c04706fd5aad03dfdca8691f1bf1c for job 
d0991f0ae712a9df710aa03311a32c8c from resource manager with leader id 
.

2021-09-28 04:54:13,383 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Add job 
d0991f0ae712a9df710aa03311a32c8c for job leader monitoring.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.JobLeaderService- Successful 
registration at job manager 
akka.tcp://fl...@d43723-714.dc.gs.com:44887/user/jobmanager_392 for job 
d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Establish 
JobManager connection for job d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,397 INFO  [flink-akka.actor.default-dispatcher-16] 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Offer reserved 
slots to the leader of job d0991f0ae712a9df710aa03311a32c8c.

2021-09-28 04:54:13,405 INFO  [CHAIN DataSource (settl_delivery_type_code | 
DistCp | Sourcing Files) -> FlatMap (settl_delivery_type_code | DistCp | 
Copying Batch Data) (1/1)] org.apache.flink.runtime.blob.BlobClient 
 - Downloading 
d0991f0ae712a9df710aa03311a32c8c/p-54dfdc41d9a995e5b75eb9eb29bcac91725fc425-be2bc5df9ef6ce331e8019cf32eb222b
 from d43723-714.dc.gs.com/10.175.239.171:38726

2021-09-28 04:54:16,942 INFO  [flink-akka.actor.default-dispatcher-3] 
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable  - Free slot 
TaskSlot(index:0, state:ACTIVE, resource profile: 
ResourceProfile{cpuCores=1.7976931348623157E308, heapMemoryInMB=2147483647, 
directMemoryInMB=2147483647, nativeMemoryInMB=2147483647, 
networkMemoryInMB=2147483647, managedMemoryInMB=9318}, allocationId: 
b26c04706fd5aad03dfdca8691f1bf1c, jobId: d0991f0ae712a9df710aa03311a

RE: Event is taking a lot of time between the operators

2021-09-29 Thread Sanket Agrawal
Thank you @Piotr Nowojski for helping me.

From: Piotr Nowojski 
Sent: Wednesday, September 29, 2021 12:53 PM
To: Sanket Agrawal 
Cc: Ragini Manjaiah ; user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi Sanket,

As I mentioned in the previous email, it's most likely still an issue of 
backpressure and you can check it as I described in that message. Either your 
records are stuck in the network buffers between (I) to async operations (if 
there is a network exchange), and/or inside the `AsyncWaitOperator`'s internal 
queue (II). If it's causing you problems

I. For the former problem (network buffers) you can:
a) get rid of the network exchange, via removing keyBy/shuffle/rebalance (might 
not be feasible, depending on your business logic)
b) reduce the amount of the in-flight data. In Flink 1.14 we are adding 
automatic buffer debloating mechanism, in Flink 1.8 you can not use, but you 
could manually tweak both amount and the size of the buffers. You can read 
about it here [1], just ignore the automatic buffer debloating mechanism.
II. You can change the size of the internal queue by adjusting the `capacity` 
parameter [2]

The more buffered in-flight data you have between operators, the longer the 
delay between processing the same record by two different operators.

Best,
Piotrek

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api



śr., 29 wrz 2021 o 08:20 Sanket Agrawal 
mailto:sanket.agra...@infosys.com>> napisał(a):
Hi Ragini,

For measuring time in an async we have put a logger as the first and the last 
statement in asyncInvoke and for measuring time between the asyncs we are 
simply subtracting the message2's start time and message1's end time. Also, we 
are using 1 as the parallelism.

Please let me know if you need any other information or if you have any 
recommendations on improving the approach.

Thanks,
Sanket Agrawal

From: Ragini Manjaiah 
mailto:ragini.manja...@gmail.com>>
Sent: Wednesday, September 29, 2021 11:17 AM
To: Sanket Agrawal 
mailto:sanket.agra...@infosys.com>>
Cc: Piotr Nowojski mailto:pnowoj...@apache.org>>; 
user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi Sanket,
 I have a similar use case. how are you measuring the time for Async1` function 
to return the result and external api call

On Wed, Sep 29, 2021 at 10:47 AM Sanket Agrawal 
mailto:sanket.agra...@infosys.com>> wrote:
Hi @Piotr Nowojski,

Thank you for replying back. Yes, first async is taking between 1300-1500 
milliseconds but that is called on a CompletableFuture.supplyAsync and the 
Async Capacity is set to 1000.

Async Code Structure: Inside asyncInvoke we are calling 
CompletableFuture.supplyAsync and inside supplyAsync we are calling an external 
API which is taking around 1005ms to 1040ms. Rest of the code for request 
creation/response validation is also inside the supplyAsync and is taking 
around 250ms.

This way we tried that the main Async thread(as the async does not uses 
multiple threads directly) is available for the next message as soon as it 
calls CompletableFuture.supplyAsync on the current message.

Thanks,
Sanket Agrawal

From: Piotr Nowojski mailto:pnowoj...@apache.org>>
Sent: Tuesday, September 28, 2021 8:02 PM
To: Sanket Agrawal 
mailto:sanket.agra...@infosys.com>>
Cc: user@flink.apache.org
Subject: Re: Event is taking a lot of time between the operators


[**EXTERNAL EMAIL**]
Hi,

With Flink 1.8.0 I'm not sure how reliable the backpressure status is in the 
WebUI when it comes to the Async operators. If I remember correctly until 
around Flink 1.10 (+/- 2 version) backpressure monitoring was checking for 
thread dumps stuck in requesting Fli

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Javier Vegas
Thanks again, Matthias!

Putting  -Djobmanager.rpc.address=$HOST and  -Djobmanager.rpc.port=$PORT0
as params for appmaster.sh
I see in tog they seem to transform in the correct values

-Djobmanager.rpc.address=10.0.23.35 -Djobmanager.rpc.port=31009

but a bit later the appmaster dies with this new error. it is unclear what
address it is trying to bind, I added explicit params
-Drest.bind-port=8081 and
  -Drest.port=8081 in case jobmanager.rpc.port was somehow interfering,
but that didn't help.

2021-09-29 10:29:59.845 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
MesosSessionClusterEntrypoint down with application status FAILED.
Diagnostics java.net.BindException: Cannot assign requested address
at java.base/sun.nio.ch.Net.bind0(Native Method)
at java.base/sun.nio.ch.Net.bind(Unknown Source)
at java.base/sun.nio.ch.Net.bind(Unknown Source)
at java.base/sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at 
org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel.doBind(NioServerSocketChannel.java:134)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe.bind(AbstractChannel.java:550)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.bind(DefaultChannelPipeline.java:1334)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeBind(AbstractChannelHandlerContext.java:506)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.bind(AbstractChannelHandlerContext.java:491)
at 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.bind(DefaultChannelPipeline.java:973)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel.bind(AbstractChannel.java:248)
at 
org.apache.flink.shaded.netty4.io.netty.bootstrap.AbstractBootstrap$2.run(AbstractBootstrap.java:356)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at 
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)


.


On Wed, Sep 29, 2021 at 2:36 AM Matthias Pohl 
wrote:

> The port has its separate configuration parameter jobmanager.rpc.port [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1
>
> On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:
>
>> Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
>> param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
>> properly to the host IP and port mapped to 8081
>>
>> 2021-09-29 07:58:05.452 [main] INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
>> -Djobmanager.rpc.address=10.0.22.114:31894
>>
>> which is very promising. But sadly a little bit later appmaster dies with
>> this errror:
>>
>> 2021-09-29 07:58:05.648 [main] INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
>> cluster services.
>> 2021-09-29 07:58:05.674 [main] INFO
>>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
>> MesosSessionClusterEntrypoint down with application status FAILED.
>> Diagnostics org.apache.flink.configurati
>> on.IllegalConfigurationException: The configured hostname is not valid
>> at
>> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
>> at
>> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
>> at
>> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
>> at
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
>> at
>> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(

Re: flink rest endpoint creation failure

2021-09-29 Thread Matthias Pohl
Hi Curt,
could you elaborate a bit more on your setup? Maybe, provide commands you
used to deploy the jobs and the Flink/YARN logs. What's puzzling me is your
statement about "two JobManagers spinning up" and "everything's working
fine if two TaskManagers are running on different instances".
- When talking about Flink applications, you're talking about application
mode?
- I have the feeling you're mixing up JobManager and TaskManager in your
initial description. Could you clarify this?
- Actually, each of the Flink components (JobManager and TaskManager)
should run in its own YARN container. The way you describe it it sounds
like Flink runs within one container?

Best,
Matthias



On Thu, Sep 23, 2021 at 5:14 PM Curt Buechter  wrote:

> Thanks Robert,
> But, no, the rest.bind-port is not set to 35485 in the configuration.
> Other jobs use different ports, so it is getting set dynamically.
>
>
> #==
> # Rest & web frontend
>
> #==
>
> # The port to which the REST client connects to. If rest.bind-port has
> # not been specified, then the server will bind to this port as well.
> #
> #rest.port: 8081
>
> # The address to which the REST client will connect to
> #
> #rest.address: 0.0.0.0
>
> # Port range for the REST and web server to bind to.
> #
> #rest.bind-port: 8080-8090
>
> # The address that the REST & web server binds to
> #
> #rest.bind-address: 0.0.0.0
>
> # Flag to specify whether job submission is enabled from the web-based
> # runtime monitor. Uncomment to disable.
>
> #web.submit.enable: false
>
>
>
> On Wed, Sep 22, 2021 at 11:46 AM Curt Buechter 
> wrote:
>
>> Hi,
>> I'm getting an error that happens randomly when starting a flink
>> application.
>>
>> For context, this is running in YARN on AWS. This application is one that
>> converts from the Table API to the Stream API, so two flink
>> applications/jobmanagers are trying to start up. I think what happens is
>> that the rest api port is chosen, and is the same for both of the flink
>> apps. If YARN chooses two different instances for the two task managers,
>> they each work fine and start their rest api on the same port on their own
>> respective machine. But, if YARN chooses the same instance for both job
>> managers, they both try to start up the rest api on the same port on the
>> same machine, and I get the error.
>>
>> Here is the error:
>>
>> 2021-09-22 15:47:27,724 ERROR 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Could not 
>> start cluster entrypoint YarnJobClusterEntrypoint.
>> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
>> initialize the cluster entrypoint YarnJobClusterEntrypoint.
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
>>  [flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:99)
>>  [flink-dist_2.12-1.13.2.jar:1.13.2]
>> Caused by: org.apache.flink.util.FlinkException: Could not create the 
>> DispatcherResourceManagerComponent.
>>  at 
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:275)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:250)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at java.security.AccessController.doPrivileged(Native Method) 
>> ~[?:1.8.0_282]
>>  at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_282]
>>  at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
>>  ~[hadoop-common-3.2.1-amzn-3.jar:?]
>>  at 
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  ... 2 more
>> Caused by: java.net.BindException: Could not start rest endpoint on any port 
>> in port range 35485
>>  at 
>> org.apache.flink.runtime.rest.RestServerEndpoint.start(RestServerEndpoint.java:234)
>>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>>  at 
>> org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:172)
>>  ~[flink-dist_2.12-1.13.2

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
The port has its separate configuration parameter jobmanager.rpc.port [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#jobmanager-rpc-port-1

On Wed, Sep 29, 2021 at 10:11 AM Javier Vegas  wrote:

> Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
> param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
> properly to the host IP and port mapped to 8081
>
> 2021-09-29 07:58:05.452 [main] INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
> -Djobmanager.rpc.address=10.0.22.114:31894
>
> which is very promising. But sadly a little bit later appmaster dies with
> this errror:
>
> 2021-09-29 07:58:05.648 [main] INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
> cluster services.
> 2021-09-29 07:58:05.674 [main] INFO
>  org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
> MesosSessionClusterEntrypoint down with application status FAILED.
> Diagnostics org.apache.flink.configurati
> on.IllegalConfigurationException: The configured hostname is not valid
> at
> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
> at
> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
> at java.base/java.security.AccessController.doPrivileged(Native Method)
> at java.base/javax.security.auth.Subject.doAs(Unknown Source)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
> at
> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:177)
> ... 17 more
> .
> 2021-09-29 07:58:05.685 [main] ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Could not start
> cluster entrypoint MesosSessionClusterEntrypoint.
> org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
> initialize the cluster entrypoint MesosSessionClusterEntrypoint.
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
> Caused by: org.apache.flink.configuration.IllegalConfigurationException:
> The configured hostname is not valid
> at
> org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
> at
> org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
> at
> org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
> at
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
> at
> org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Javier Vegas
Matthias, thanks for the suggestion! I changed my jobmanager.rpc.address
param from $HOSTNAME to $HOST:$PORT0 which in the log I see resolves
properly to the host IP and port mapped to 8081

2021-09-29 07:58:05.452 [main] INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint  -
-Djobmanager.rpc.address=10.0.22.114:31894

which is very promising. But sadly a little bit later appmaster dies with
this errror:

2021-09-29 07:58:05.648 [main] INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Initializing
cluster services.
2021-09-29 07:58:05.674 [main] INFO
 org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Shutting
MesosSessionClusterEntrypoint down with application status FAILED.
Diagnostics org.apache.flink.configurati
on.IllegalConfigurationException: The configured hostname is not valid
at
org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
at
org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:186)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
Caused by: java.lang.IllegalArgumentException
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:122)
at
org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:177)
... 17 more
.
2021-09-29 07:58:05.685 [main] ERROR
org.apache.flink.runtime.entrypoint.ClusterEntrypoint  - Could not start
cluster entrypoint MesosSessionClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to
initialize the cluster entrypoint MesosSessionClusterEntrypoint.
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:212)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:600)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.main(MesosSessionClusterEntrypoint.java:114)
Caused by: org.apache.flink.configuration.IllegalConfigurationException:
The configured hostname is not valid
at
org.apache.flink.util.NetUtils.unresolvedHostToNormalizedString(NetUtils.java:179)
at
org.apache.flink.util.NetUtils.unresolvedHostAndPortToNormalizedString(NetUtils.java:197)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:207)
at
org.apache.flink.runtime.clusterframework.BootstrapTools.startRemoteActorSystem(BootstrapTools.java:152)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:370)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils$AkkaRpcServiceBuilder.createAndStart(AkkaRpcServiceUtils.java:344)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils.createRemoteRpcService(AkkaRpcServiceUtils.java:92)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:294)
at
org.apache.flink.mesos.entrypoint.MesosSessionClusterEntrypoint.initializeServices(MesosSessionClusterEntrypoint.java:61)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:239)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:189)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.

Re: Event is taking a lot of time between the operators

2021-09-29 Thread Piotr Nowojski
Hi Sanket,

As I mentioned in the previous email, it's most likely still an issue of
backpressure and you can check it as I described in that message. Either
your records are stuck in the network buffers between (I) to async
operations (if there is a network exchange), and/or inside the
`AsyncWaitOperator`'s internal queue (II). If it's causing you problems

I. For the former problem (network buffers) you can:
a) get rid of the network exchange, via removing keyBy/shuffle/rebalance
(might not be feasible, depending on your business logic)
b) reduce the amount of the in-flight data. In Flink 1.14 we are adding
automatic buffer debloating mechanism, in Flink 1.8 you can not use, but
you could manually tweak both amount and the size of the buffers. You can
read about it here [1], just ignore the automatic buffer debloating
mechanism.
II. You can change the size of the internal queue by adjusting the
`capacity` parameter [2]

The more buffered in-flight data you have between operators, the longer the
delay between processing the same record by two different operators.

Best,
Piotrek

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/network_mem_tuning/
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api



śr., 29 wrz 2021 o 08:20 Sanket Agrawal 
napisał(a):

> Hi Ragini,
>
>
>
> For measuring time in an async we have put a logger as the first and the
> last statement in asyncInvoke and for measuring time between the asyncs
> we are simply subtracting the message2’s start time and message1’s end
> time. Also, we are using 1 as the parallelism.
>
>
>
> Please let me know if you need any other information or if you have any
> recommendations on improving the approach.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>
> *From:* Ragini Manjaiah 
> *Sent:* Wednesday, September 29, 2021 11:17 AM
> *To:* Sanket Agrawal 
> *Cc:* Piotr Nowojski ; user@flink.apache.org
> *Subject:* Re: Event is taking a lot of time between the operators
>
>
>
> [**EXTERNAL EMAIL**]
>
> Hi Sanket,
>
>  I have a similar use case. how are you measuring the time for Async1`
> function to return the result and external api call
>
>
>
> On Wed, Sep 29, 2021 at 10:47 AM Sanket Agrawal <
> sanket.agra...@infosys.com> wrote:
>
> Hi @Piotr Nowojski ,
>
>
>
> Thank you for replying back. Yes, first async is taking between 1300-1500
> milliseconds but that is called on a CompletableFuture.*supplyAsync *and
> the Async Capacity is set to 1000.
>
>
>
> *Async Code Structure*: Inside asyncInvoke we are calling
> CompletableFuture.*supplyAsync *and inside* supplyAsync *we are calling
> an external API which is taking around 1005ms to 1040ms. Rest of the code
> for request creation/response validation is also inside the* supplyAsync *and
> is taking around 250ms.
>
>
>
> This way we tried that the main Async thread(as the async does not uses
> multiple threads directly) is available for the next message as soon as it
> calls CompletableFuture.supplyAsync on the current message.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Tuesday, September 28, 2021 8:02 PM
> *To:* Sanket Agrawal 
> *Cc:* user@flink.apache.org
> *Subject:* Re: Event is taking a lot of time between the operators
>
>
>
> [**EXTERNAL EMAIL**]
>
> Hi,
>
>
>
> With Flink 1.8.0 I'm not sure how reliable the backpressure status is in
> the WebUI when it comes to the Async operators. If I remember correctly
> until around Flink 1.10 (+/- 2 version) backpressure monitoring was
> checking for thread dumps stuck in requesting Flink's network memory
> buffers. If in your job AsyncFunction is the source of a backpressure, it
> would be skipped and not reported. For analysing backpressure I would
> highly recommend upgrading to Flink 1.13.x as it has greatly improved
> tooling for that [1]. And in that version AsynFunctions are definitely
> handled correctly. Since Flink 1.10 I believe you can use the
> `isBackPressured` metric. In previous versions you would have to rely on
> buffer usage metrics as described here [2].
>
>
>
>
>
> [1] https://flink.apache.org/2021/07/07/backpressure.html
> 
>
> [2]
> https://flink.apache.org/2019/07/23/flink-network-stack-2.html#network-metrics
> 

Re: Unable to connect to Mesos on mesos-appmaster.sh start

2021-09-29 Thread Matthias Pohl
One thing that was puzzling me yesterday when reading your post: Have you
tried $HOST instead of $HOSTNAME in the Marathon configuration? When I
played around with Mesos, I remember using HOST to resolve the host's IP
address instead of the host's name. It could be that the hostname itself
cannot be resolved to the right IP address. But I struggled to find proper
documentation to back that up. Only in the recipes section of the Marathon
docs [1], HOST was used as well.

Matthias

[1]
https://mesosphere.github.io/marathon/docs/recipes.html#command-executor-health-checks

On Wed, Sep 29, 2021 at 3:37 AM Javier Vegas  wrote:

> Another update:  Looking more carefully in my appmaster log, I see the
> following
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -
> Registering as new framework.
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -
> -
>
> ---
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -  Mesos
> Info:
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Master
> URL: 10.0.18.246:5050
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -  Framework
> Info:
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - ID:
> (none)
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Name:
> flink-test
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Failover
> Timeout (secs): 604800.0
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Role: *
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - 
> Capabilities:
> (none)
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Principal:
> (none)
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Host:
> 311dcf7fd77c
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Web
> UI: http://311dcf7fd77c:8081
>
> 2021-09-29 01:15:39.680 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  -
> -
>
> ---
>
>
> which is picking up the mesos.master and
> mesos.resourcemanager.framework.name params I am passing to
> mesos-appmaster.sh
>
>
> In my Mesos dashboard I can see the framework has been created with the
> right name, but has no associated agents/tasks to it. So at least Flink has
> been able to connect to the Mesos master to create the framework
>
>
> Later in the mesos-appmaster log is when I see the Mesos connection errors:
>
>
> 2021-09-29 01:15:39.726 [flink-akka.actor.default-dispatcher-3] DEBUG
> o.a.f.r.resourcemanager.slotmanager.DeclarativeSlotManager  - Starting
> the slot manager.
>
> 2021-09-29 01:15:39.815 [flink-akka.actor.default-dispatcher-2] DEBUG
> org.apache.flink.mesos.scheduler.ConnectionMonitor  - State change
> (StoppedState -> StoppedState) with data ()
>
> 2021-09-29 01:15:39.823 [flink-akka.actor.default-dispatcher-3] DEBUG
> o.a.f.runtime.resourcemanager.active.ActiveResourceManager  - Trigger
> heartbeat request.
>
> 2021-09-29 01:15:39.823 [flink-akka.actor.default-dispatcher-3] DEBUG
> org.apache.flink.mesos.scheduler.ReconciliationCoordinator  - State
> change (Suspended -> Suspended) with data ReconciliationData(Map(),0)
>
> 2021-09-29 01:15:39.823 [flink-akka.actor.default-dispatcher-3] DEBUG
> o.a.f.runtime.resourcemanager.active.ActiveResourceManager  - Trigger
> heartbeat request.
>
> 2021-09-29 01:15:39.824 [flink-akka.actor.default-dispatcher-3] INFO
> org.apache.flink.mesos.scheduler.ConnectionMonitor  - Connecting to
> Mesos...
>
> 2021-09-29 01:15:39.825 [flink-akka.actor.default-dispatcher-3] DEBUG
> org.apache.flink.mesos.scheduler.ConnectionMonitor  - State change
> (StoppedState -> ConnectingState) with data ()
>
> 2021-09-29 01:15:39.826 [flink-akka.actor.default-dispatcher-3] INFO
> o.a.f.m.runtime.clusterframework.MesosResourceManagerDriver  - Mesos
> resource manager started.
>
>