Re: Flink survey by data Artisans
From: Kostas TzoumasTo: "d...@flink.apache.org" ; user@flink.apache.org Sent: Friday, November 18, 2016 7:28 AM Subject: Flink survey by data Artisans Hi everyone! The Apache Flink community has evolved quickly over the past 2+ years, and there are now many production Flink deployments in organizations of all sizes. This is both exciting and humbling :-) data Artisans is running a brief survey to understand Apache Flink usage and the needs of the community. We are hoping that this survey will help identify common usage patterns, as well as pinpoint what are the most needed features for Flink. We'll share a report with a summary of findings at the conclusion of the survey with the community. All of the responses will remain confidential, and only aggregate statistics will be shared. I expect the survey to take 5-10 minutes, and all questions are optional--we appreciate any feedback that you're willing to provide. As a thank you, respondents will be entered in a drawing to win one of 10 tickets to Flink Forward 2017 (your choice of Berlin or the first-ever San Francisco edition). The survey is available here: http://www.surveygizmo.com/s3/ 3166399/181bdb611f22 Looking forward to hearing back from you! Best, Kostas
Re: Why did the Flink Cluster JM crash?
Hi Till.I just checked and my test finished after 19 hours with the config below.The expected Linear Road test time is 3.5 hours.I have achieved this for 1/2 data I sent yesterday.But for 105 G worth of tuples I get 19 hours.No exceptions, no errors. Clean. But almost 5 times slower than expected.Thanks again. From: amir bahmanyari <amirto...@yahoo.com> To: Till Rohrmann <trohrm...@apache.org> Cc: "user@flink.apache.org" <user@flink.apache.org> Sent: Thursday, November 10, 2016 9:35 AM Subject: Re: Why did the Flink Cluster JM crash? Thanks Till.I did all of that with one difference.I have only 1 topic with 64 partitions correlating to the total number of slots in all Flink servers.Can you elaborate on "As long as you have more Kafka topics than Flink Kafka consumers (subtasks) " pls?Perhaps thats the bottleneck in my config and object creation.I send data to 1 topic across a 2 nodes Kafka cluster with 64 partitions.And KafkaIo() in Beam app is set to receive from it.How can "more Kafka topics" translate to KafkaIo() settings in Beam API?Thanks+regardsAmir- From: Till Rohrmann <trohrm...@apache.org> To: amir bahmanyari <amirto...@yahoo.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Sent: Thursday, November 10, 2016 2:13 AM Subject: Re: Why did the Flink Cluster JM crash? The amount of data should be fine. Try to set the number of slots to the number of cores you have available. As long as you have more Kafka topics than Flink Kafka consumers (subtasks) you should be fine. But I think you can also decrease the number of Kafka partitions a little bit. I guess that an extensive number of partitions also comes with a price. But I'm no expert there. Hope your experiments run well with these settings. Cheers,Till On Wed, Nov 9, 2016 at 8:02 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Thanks Till.I have been trying out many many configuration combinations to get to the peak of what I can get as a reasonable performance.And yes, when I drop the number of slots, I dont get OOM. However, I dont get the response I want either.The amount of data I send is kinda huge; about 105 G that's sent in an stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 2 nodes cluster of Kafka.From what I understand, you are suggesting that to get the best performance, the total number of slots should be equal to the total number of cores distributed in the cluster.For the sake of making sure we have done that, I would go back and repeat the testing with that in mind.Fyi, the Kafka partitions are 4096. Roughly, 1024 per 16 cores per one node. Is this reasonable?Once I know the answer to this question, I will go ahead and readjust my config and repeat the test.I appreciate your response.Amir- From: Till Rohrmann <till.rohrm...@gmail.com> To: amir bahmanyari <amirto...@yahoo.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Sent: Wednesday, November 9, 2016 1:27 AM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, I fear that 900 slots per task manager is a bit too many unless your machine has 900 cores. As a rule of thumb you should allocate as many slots as your machines have cores. Maybe you could try to decrease the number of slots and see if you still observe an OOM error. Cheers,Till On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirto...@yahoo.com> wrote: Ok. There is an OOM exception...but this used to work fine with the same configurations.There are four nodes: beam1 through 4.The Kafka partitions are 4096 > 3584 deg of parallelism. jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots: 896 taskmanager.memory. preallocate: false parallelism.default: 3584 Thanks for your valuable time Till. AnonymousParDo -> AnonymousParDo (3584/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) ( ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (1/3584) ( 865c54432153a0230e62bf7610118f f8) switched from RUNNING to CANCELING2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. jobmanager.JobManager - Status of job e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) changed to FAILING.java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread. java:714) at java.util.concurrent. ThreadPoolE
Re: Why did the Flink Cluster JM crash?
Thanks Till.I did all of that with one difference.I have only 1 topic with 64 partitions correlating to the total number of slots in all Flink servers.Can you elaborate on "As long as you have more Kafka topics than Flink Kafka consumers (subtasks) " pls?Perhaps thats the bottleneck in my config and object creation.I send data to 1 topic across a 2 nodes Kafka cluster with 64 partitions.And KafkaIo() in Beam app is set to receive from it.How can "more Kafka topics" translate to KafkaIo() settings in Beam API?Thanks+regardsAmir- From: Till Rohrmann <trohrm...@apache.org> To: amir bahmanyari <amirto...@yahoo.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Sent: Thursday, November 10, 2016 2:13 AM Subject: Re: Why did the Flink Cluster JM crash? The amount of data should be fine. Try to set the number of slots to the number of cores you have available. As long as you have more Kafka topics than Flink Kafka consumers (subtasks) you should be fine. But I think you can also decrease the number of Kafka partitions a little bit. I guess that an extensive number of partitions also comes with a price. But I'm no expert there. Hope your experiments run well with these settings. Cheers,Till On Wed, Nov 9, 2016 at 8:02 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Thanks Till.I have been trying out many many configuration combinations to get to the peak of what I can get as a reasonable performance.And yes, when I drop the number of slots, I dont get OOM. However, I dont get the response I want either.The amount of data I send is kinda huge; about 105 G that's sent in an stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 2 nodes cluster of Kafka.From what I understand, you are suggesting that to get the best performance, the total number of slots should be equal to the total number of cores distributed in the cluster.For the sake of making sure we have done that, I would go back and repeat the testing with that in mind.Fyi, the Kafka partitions are 4096. Roughly, 1024 per 16 cores per one node. Is this reasonable?Once I know the answer to this question, I will go ahead and readjust my config and repeat the test.I appreciate your response.Amir- From: Till Rohrmann <till.rohrm...@gmail.com> To: amir bahmanyari <amirto...@yahoo.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Sent: Wednesday, November 9, 2016 1:27 AM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, I fear that 900 slots per task manager is a bit too many unless your machine has 900 cores. As a rule of thumb you should allocate as many slots as your machines have cores. Maybe you could try to decrease the number of slots and see if you still observe an OOM error. Cheers,Till On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirto...@yahoo.com> wrote: Ok. There is an OOM exception...but this used to work fine with the same configurations.There are four nodes: beam1 through 4.The Kafka partitions are 4096 > 3584 deg of parallelism. jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots: 896 taskmanager.memory. preallocate: false parallelism.default: 3584 Thanks for your valuable time Till. AnonymousParDo -> AnonymousParDo (3584/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) ( ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (1/3584) ( 865c54432153a0230e62bf7610118f f8) switched from RUNNING to CANCELING2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. jobmanager.JobManager - Status of job e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) changed to FAILING.java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread. java:714) at java.util.concurrent. ThreadPoolExecutor.addWorker( ThreadPoolExecutor.java:950) at java.util.concurrent. ThreadPoolExecutor. ensurePrestart( ThreadPoolExecutor.java:1587) at java.util.concurrent. ScheduledThreadPoolExecutor. delayedExecute( ScheduledThreadPoolExecutor. java:334) at java.util.concurrent. ScheduledThreadPoolExecutor. schedule( ScheduledThreadPoolExecutor. java:533) at java.util.concurrent. Executors$ DelegatedScheduledExecutorServ ice.schedule(Executors.java: 729) at org.apache.flink.streaming. runtime.tasks.StreamTask. registerTimer(StreamTask.java: 652) at org.apache
Re: Why did the Flink Cluster JM crash?
Thanks Till.I have been trying out many many configuration combinations to get to the peak of what I can get as a reasonable performance.And yes, when I drop the number of slots, I dont get OOM. However, I dont get the response I want either.The amount of data I send is kinda huge; about 105 G that's sent in an stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 2 nodes cluster of Kafka.From what I understand, you are suggesting that to get the best performance, the total number of slots should be equal to the total number of cores distributed in the cluster.For the sake of making sure we have done that, I would go back and repeat the testing with that in mind.Fyi, the Kafka partitions are 4096. Roughly, 1024 per 16 cores per one node. Is this reasonable?Once I know the answer to this question, I will go ahead and readjust my config and repeat the test.I appreciate your response.Amir- From: Till Rohrmann <till.rohrm...@gmail.com> To: amir bahmanyari <amirto...@yahoo.com> Cc: "user@flink.apache.org" <user@flink.apache.org> Sent: Wednesday, November 9, 2016 1:27 AM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, I fear that 900 slots per task manager is a bit too many unless your machine has 900 cores. As a rule of thumb you should allocate as many slots as your machines have cores. Maybe you could try to decrease the number of slots and see if you still observe an OOM error. Cheers,Till On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirto...@yahoo.com> wrote: Ok. There is an OOM exception...but this used to work fine with the same configurations.There are four nodes: beam1 through 4.The Kafka partitions are 4096 > 3584 deg of parallelism. jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots: 896 taskmanager.memory. preallocate: false parallelism.default: 3584 Thanks for your valuable time Till. AnonymousParDo -> AnonymousParDo (3584/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) ( ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (1/3584) ( 865c54432153a0230e62bf7610118f f8) switched from RUNNING to CANCELING2016-11-08 22:51:44,474 INFO org.apache.flink.runtime. jobmanager.JobManager - Status of job e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) changed to FAILING.java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread. java:714) at java.util.concurrent. ThreadPoolExecutor.addWorker( ThreadPoolExecutor.java:950) at java.util.concurrent. ThreadPoolExecutor. ensurePrestart( ThreadPoolExecutor.java:1587) at java.util.concurrent. ScheduledThreadPoolExecutor. delayedExecute( ScheduledThreadPoolExecutor. java:334) at java.util.concurrent. ScheduledThreadPoolExecutor. schedule( ScheduledThreadPoolExecutor. java:533) at java.util.concurrent. Executors$ DelegatedScheduledExecutorServ ice.schedule(Executors.java: 729) at org.apache.flink.streaming. runtime.tasks.StreamTask. registerTimer(StreamTask.java: 652) at org.apache.flink.streaming. api.operators. AbstractStreamOperator. registerTimer( AbstractStreamOperator.java: 250) at org.apache.flink.streaming. api.operators. StreamingRuntimeContext. registerTimer( StreamingRuntimeContext.java: 92) at org.apache.beam.runners.flink. translation.wrappers.streaming.io. UnboundedSourceWrapper. setNextWatermarkTimer( UnboundedSourceWrapper.java: 381) at org.apache.beam.runners.flink. translation.wrappers.streaming.io. UnboundedSourceWrapper.run( UnboundedSourceWrapper.java: 233) at org.apache.flink.streaming. api.operators.StreamSource. run(StreamSource.java:78) at org.apache.flink.streaming. runtime.tasks. SourceStreamTask.run( SourceStreamTask.java:56) at org.apache.flink.streaming. runtime.tasks.StreamTask. invoke(StreamTask.java:224) at org.apache.flink.runtime. taskmanager.Task.run(Task. java:559) at java.lang.Thread.run(Thread. java:745) From: Till Rohrmann <till.rohrm...@gmail.com> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, November 8, 2016 2:11 PM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, what does the JM logs say? Cheers,Till On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Hi colleagues,I started the cluster all fine. Started the Beam app running in the Flink Cluster fine.Dashboard showed all tasks being
Re: Why did the Flink Cluster JM crash?
Ok. There is an OOM exception...but this used to work fine with the same configurations.There are four nodes: beam1 through 4.The Kafka partitions are 4096 > 3584 deg of parallelism. jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots: 896 taskmanager.memory.preallocate: false parallelism.default: 3584 Thanks for your valuable time Till. AnonymousParDo -> AnonymousParDo (3584/3584) (ebe8da5bda017ee31ad774c5bc5e5e88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) (ddf5a8939c1fc4ad1e6d71f17fe5ab0b) switched from RUNNING to FAILED2016-11-08 22:51:44,474 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (1/3584) (865c54432153a0230e62bf7610118ff8) switched from RUNNING to CANCELING2016-11-08 22:51:44,474 INFO org.apache.flink.runtime.jobmanager.JobManager - Status of job e61cada683c0f7a709101c26c2c9a17c (benchbeamrunners-abahman-1108225128) changed to FAILING.java.lang.OutOfMemoryError: unable to create new native thread at java.lang.Thread.start0(Native Method) at java.lang.Thread.start(Thread.java:714) at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1587) at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334) at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533) at java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729) at org.apache.flink.streaming.runtime.tasks.StreamTask.registerTimer(StreamTask.java:652) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.registerTimer(AbstractStreamOperator.java:250) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.registerTimer(StreamingRuntimeContext.java:92) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:381) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:233) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) From: Till Rohrmann <till.rohrm...@gmail.com> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, November 8, 2016 2:11 PM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, what does the JM logs say? Cheers,Till On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Hi colleagues,I started the cluster all fine. Started the Beam app running in the Flink Cluster fine.Dashboard showed all tasks being consumed and open for business.I started sending data to the Beam app, and all of the sudden the Flink JM crashed.Exceptions below.Thanks+regardsAmir java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:113) at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:48) at org.apache.beam.sdk.Pipeline. run(Pipeline.java:183) at benchmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622) //p.run(); at sun.reflect. NativeMethodAccessorImpl. invoke0(Native Method) at sun.reflect. NativeMethodAccessorImpl. invoke( NativeMethodAccessorImpl.java: 62) at sun.reflect. DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43) at java.lang.reflect.Method. invoke(Method.java:498) at org.apache.flink.client. program.PackagedProgram. callMainMethod( PackagedProgram.java:505) at org.apache.flink.client. program.PackagedProgram. invokeInteractiveModeForExecut ion(PackagedProgram.java:403) at org.apache.flink.client. program.Client.runBlocking( Client.java:248) at org.apache.flink.client. CliFrontend. executeProgramBlocking( CliFrontend.java:866) at org.apache.flink.client. CliFrontend.run(CliFrontend. java:333) at org.apache.flink.client. CliFrontend.parseParameters( CliFrontend.java:1189) at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)Caused by: org.apache.flink.client. program. ProgramInvocationException: The program execution failed: Communication with JobManager failed
Re: Why did the Flink Cluster JM crash?
OOps! sorry Till.I replicated it and I see exceptions in JM logs.How can I send the logs to you? or what "interesting" part of it do you need so I can copy/paste it here...Thanks From: Till Rohrmann <till.rohrm...@gmail.com> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, November 8, 2016 2:11 PM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, what does the JM logs say? Cheers,Till On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Hi colleagues,I started the cluster all fine. Started the Beam app running in the Flink Cluster fine.Dashboard showed all tasks being consumed and open for business.I started sending data to the Beam app, and all of the sudden the Flink JM crashed.Exceptions below.Thanks+regardsAmir java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:113) at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:48) at org.apache.beam.sdk.Pipeline. run(Pipeline.java:183) at benchmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622) //p.run(); at sun.reflect. NativeMethodAccessorImpl. invoke0(Native Method) at sun.reflect. NativeMethodAccessorImpl. invoke( NativeMethodAccessorImpl.java: 62) at sun.reflect. DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43) at java.lang.reflect.Method. invoke(Method.java:498) at org.apache.flink.client. program.PackagedProgram. callMainMethod( PackagedProgram.java:505) at org.apache.flink.client. program.PackagedProgram. invokeInteractiveModeForExecut ion(PackagedProgram.java:403) at org.apache.flink.client. program.Client.runBlocking( Client.java:248) at org.apache.flink.client. CliFrontend. executeProgramBlocking( CliFrontend.java:866) at org.apache.flink.client. CliFrontend.run(CliFrontend. java:333) at org.apache.flink.client. CliFrontend.parseParameters( CliFrontend.java:1189) at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)Caused by: org.apache.flink.client. program. ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.client. program.Client.runBlocking( Client.java:381) at org.apache.flink.client. program.Client.runBlocking( Client.java:355) at org.apache.flink.streaming. api.environment. StreamContextEnvironment. execute( StreamContextEnvironment.java: 65) at org.apache.beam.runners.flink. FlinkPipelineExecutionEnvironm ent.executePipeline( FlinkPipelineExecutionEnvironm ent.java:118) at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:110) ... 14 moreCaused by: org.apache.flink.runtime. client.JobExecutionException: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.runtime. client.JobClient. submitJobAndWait(JobClient. java:140) at org.apache.flink.client. program.Client.runBlocking( Client.java:379) ... 18 moreCaused by: org.apache.flink.runtime. client. JobClientActorConnectionTimeou tException: Lost connection to the JobManager. at org.apache.flink.runtime. client.JobClientActor. handleMessage(JobClientActor. java:244) at org.apache.flink.runtime.akka. FlinkUntypedActor. handleLeaderSessionID( FlinkUntypedActor.java:88) at org.apache.flink.runtime.akka. FlinkUntypedActor.onReceive( FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$ anonfun$receive$1.applyOrElse( UntypedActor.scala:167) at akka.actor.Actor$class. aroundReceive(Actor.scala:465) at akka.actor.UntypedActor. aroundReceive(UntypedActor. scala:97) at akka.actor.ActorCell. receiveMessage(ActorCell. scala:516) at akka.actor.ActorCell.invoke( ActorCell.scala:487) at akka.dispatch.Mailbox. processMailbox(Mailbox.scala: 254) at akka.dispatch.Mailbox.run( Mailbox.scala:221) at akka.dispatch.Mailbox.exec( Mailbox.scala:231) at scala.concurrent.forkjoin. ForkJoinTask.doExec( ForkJoinTask.java:260) at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. pollAndExecAll(ForkJoinPool. java:1253) at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java: 1346) at scala.concurrent.forkjoin. ForkJoinPool.runWorker( ForkJoinPool.java:1979) at scala.concurrent.forkjoin. ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107)
Re: Why did the Flink Cluster JM crash?
Clean .No errors...no exceptions :-(Thanks Till. From: Till Rohrmann <till.rohrm...@gmail.com> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, November 8, 2016 2:11 PM Subject: Re: Why did the Flink Cluster JM crash? Hi Amir, what does the JM logs say? Cheers,Till On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Hi colleagues,I started the cluster all fine. Started the Beam app running in the Flink Cluster fine.Dashboard showed all tasks being consumed and open for business.I started sending data to the Beam app, and all of the sudden the Flink JM crashed.Exceptions below.Thanks+regardsAmir java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:113) at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:48) at org.apache.beam.sdk.Pipeline. run(Pipeline.java:183) at benchmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622) //p.run(); at sun.reflect. NativeMethodAccessorImpl. invoke0(Native Method) at sun.reflect. NativeMethodAccessorImpl. invoke( NativeMethodAccessorImpl.java: 62) at sun.reflect. DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43) at java.lang.reflect.Method. invoke(Method.java:498) at org.apache.flink.client. program.PackagedProgram. callMainMethod( PackagedProgram.java:505) at org.apache.flink.client. program.PackagedProgram. invokeInteractiveModeForExecut ion(PackagedProgram.java:403) at org.apache.flink.client. program.Client.runBlocking( Client.java:248) at org.apache.flink.client. CliFrontend. executeProgramBlocking( CliFrontend.java:866) at org.apache.flink.client. CliFrontend.run(CliFrontend. java:333) at org.apache.flink.client. CliFrontend.parseParameters( CliFrontend.java:1189) at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)Caused by: org.apache.flink.client. program. ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.client. program.Client.runBlocking( Client.java:381) at org.apache.flink.client. program.Client.runBlocking( Client.java:355) at org.apache.flink.streaming. api.environment. StreamContextEnvironment. execute( StreamContextEnvironment.java: 65) at org.apache.beam.runners.flink. FlinkPipelineExecutionEnvironm ent.executePipeline( FlinkPipelineExecutionEnvironm ent.java:118) at org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:110) ... 14 moreCaused by: org.apache.flink.runtime. client.JobExecutionException: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.runtime. client.JobClient. submitJobAndWait(JobClient. java:140) at org.apache.flink.client. program.Client.runBlocking( Client.java:379) ... 18 moreCaused by: org.apache.flink.runtime. client. JobClientActorConnectionTimeou tException: Lost connection to the JobManager. at org.apache.flink.runtime. client.JobClientActor. handleMessage(JobClientActor. java:244) at org.apache.flink.runtime.akka. FlinkUntypedActor. handleLeaderSessionID( FlinkUntypedActor.java:88) at org.apache.flink.runtime.akka. FlinkUntypedActor.onReceive( FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$ anonfun$receive$1.applyOrElse( UntypedActor.scala:167) at akka.actor.Actor$class. aroundReceive(Actor.scala:465) at akka.actor.UntypedActor. aroundReceive(UntypedActor. scala:97) at akka.actor.ActorCell. receiveMessage(ActorCell. scala:516) at akka.actor.ActorCell.invoke( ActorCell.scala:487) at akka.dispatch.Mailbox. processMailbox(Mailbox.scala: 254) at akka.dispatch.Mailbox.run( Mailbox.scala:221) at akka.dispatch.Mailbox.exec( Mailbox.scala:231) at scala.concurrent.forkjoin. ForkJoinTask.doExec( ForkJoinTask.java:260) at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. pollAndExecAll(ForkJoinPool. java:1253) at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java: 1346) at scala.concurrent.forkjoin. ForkJoinPool.runWorker( ForkJoinPool.java:1979) at scala.concurrent.forkjoin. ForkJoinWorkerThread.run( ForkJoinWorkerThread.java:107)
Why did the Flink Cluster JM crash?
Hi colleagues,I started the cluster all fine. Started the Beam app running in the Flink Cluster fine.Dashboard showed all tasks being consumed and open for business.I started sending data to the Beam app, and all of the sudden the Flink JM crashed.Exceptions below.Thanks+regardsAmir java.lang.RuntimeException: Pipeline execution failed at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48) at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183) at benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622) //p.run(); at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) at org.apache.flink.client.program.Client.runBlocking(Client.java:248) at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused by: org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.client.program.Client.runBlocking(Client.java:381) at org.apache.flink.client.program.Client.runBlocking(Client.java:355) at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118) at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110) ... 14 moreCaused by: org.apache.flink.runtime.client.JobExecutionException: Communication with JobManager failed: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:140) at org.apache.flink.client.program.Client.runBlocking(Client.java:379) ... 18 moreCaused by: org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost connection to the JobManager. at org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:244) at org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88) at org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at akka.dispatch.Mailbox.run(Mailbox.scala:221) at akka.dispatch.Mailbox.exec(Mailbox.scala:231) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: Flink Metrics
Hi colleagues,Is there a link that described Flink Matrices & provides example on how to utilize it pls?I really appreciate it...Cheers From: Till RohrmannTo: user@flink.apache.org Cc: d...@flink.apache.org Sent: Monday, October 17, 2016 12:52 AM Subject: Re: Flink Metrics Hi Govind, I think the DropwizardMeterWrapper implementation is just a reference implementation where it was decided to report the minute rate. You can define your own meter class which allows to configure the rate interval accordingly. Concerning Timers, I think nobody requested this metric so far. If you want, then you can open a JIRA issue and contribute it. The community would really appreciate that. Cheers, Till On Mon, Oct 17, 2016 at 5:26 AM, Govindarajan Srinivasaraghavan < govindragh...@gmail.com> wrote: > Hi, > > I am currently using flink 1.2 snapshot and instrumenting my pipeline with > flink metrics. One small suggestion I have is currently the Meter interface > only supports getRate() which is always the one minute rate. > > It would great if all the rates (1 min, 5 min & 15 min) are exposed to get > a better picture in terms of performance. > > Also is there any reason why timers are not part of flink metrics core? > > Regards, > Govind >
Re: How can I prove ....
Hi Stephan,This is from the dashboard. Total Parallelism is set = 1024.259 tasks per TM. all say Running, but I get *.out log in beam4 server only (bottom of the servers list).Does this mean that all nodes are engaged in processing the data?Why are these encircled columns having 0's for their data exchange report?Thanks+regards,Amir- From: Stephan Ewen <se...@apache.org> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Cc: Felix Dreissig <f...@f30.me> Sent: Monday, September 26, 2016 2:18 AM Subject: Re: How can I prove You do not need to create any JSON. Just click on "Running Jobs" in the UI, and then on the job. The parallelism is shown as a number in the boxes of the graph. On Sat, Sep 24, 2016 at 6:28 PM, amir bahmanyari <amirto...@yahoo.com> wrote: Thanks Felix.Interesting. I tried to create the JASON but didnt work according to the sample code I found in docs.There is a way to get the same JASON from the command line.Is there an example?Thanks+regardsAmir- From: Felix Dreissig <f...@f30.me> To: amir bahmanyari <amirto...@yahoo.com> Cc: user@flink.apache.org Sent: Saturday, September 24, 2016 8:18 AM Subject: Re: How can I prove .... Hi Amir, On 23 Sep 2016, at 19:57, amir bahmanyari <amirto...@yahoo.com> wrote: > Currently running with 512 slots all taken as indicated by the dashboard. > Are we talking about this? Then yes based on no available slots, I assume I > am at 512 . I guess Stephan is referring to the parallelism of single operators as displayed in the operator graph, see e.g. https://ci.apache.org/ projects/flink/flink-docs- release-0.10/page/img/ webclient_plan_view.png . Regards, Felix
Re: How can I prove ....
Thanks Felix.Interesting. I tried to create the JASON but didnt work according to the sample code I found in docs.There is a way to get the same JASON from the command line.Is there an example?Thanks+regardsAmir- From: Felix Dreissig <f...@f30.me> To: amir bahmanyari <amirto...@yahoo.com> Cc: user@flink.apache.org Sent: Saturday, September 24, 2016 8:18 AM Subject: Re: How can I prove Hi Amir, On 23 Sep 2016, at 19:57, amir bahmanyari <amirto...@yahoo.com> wrote: > Currently running with 512 slots all taken as indicated by the dashboard. > Are we talking about this? Then yes based on no available slots, I assume I > am at 512 . I guess Stephan is referring to the parallelism of single operators as displayed in the operator graph, see e.g. https://ci.apache.org/projects/flink/flink-docs-release-0.10/page/img/webclient_plan_view.png . Regards, Felix
Re: How can I prove ....
Hi Again, following is from the dashboard while wverything is supposedlt running.No real-time change in send/received/#of records...but one node is definitely producing a *.out file...And all TMs are reporting in their *.log files. And the process will eventually end , but very slow.Thanks again Aljoscha. From: amir bahmanyari <amirto...@yahoo.com> To: Aljoscha Krettek <aljos...@apache.org>; User <user@flink.apache.org> Sent: Thursday, September 22, 2016 9:16 AM Subject: Re: How can I prove Thanks Aljoscha,Thats why I am wondering about this. I dont see send/receive columns change at alljust 0's all the time.The only thing that changes is time stamp.Is this an indication that the nodes in the cluster are not participating in execution of the data?Thanks again.Amir- From: Aljoscha Krettek <aljos...@apache.org> To: amir bahmanyari <amirto...@yahoo.com>; User <user@flink.apache.org> Sent: Thursday, September 22, 2016 5:01 AM Subject: Re: How can I prove Hi,depending on the data source you might not be able to stress CPU/MEM because the source might be to slow. As long as you see the numbers increasing in the Flink Dashboard for all operators you should be good. Cheers,Aljoscha On Thu, 22 Sep 2016 at 00:26 amir bahmanyari <amirto...@yahoo.com> wrote: That all nodes in a Flink Cluster are involved simultaneously in processing the data?Programmatically, graphically...I need to stress CPU , MEM and all resources to their max.How can I guarantee this is happening in Flink Cluster?Out of 4 nodes, this is the highest resource usage I see from "top"...Everything else is not even close...top - 22:22:45 up 41 days, 2:39, 1 user, load average: 1.76, 1.55, 1.28Tasks: 344 total, 1 running, 343 sleeping, 0 stopped, 0 zombie%Cpu(s): 5.4 us, 1.0 sy, 0.0 ni, 93.5 id, 0.1 wa, 0.0 hi, 0.0 si, 0.0 stKiB Mem: 11551564+total, 65702020 used, 49813632 free, 115072 buffersKiB Swap: 0 total, 0 used, 0 free. 3148420 cached Mem I am pretty sure I can push FlinkRunner to way more extent than thisAnd thats where true realistic perf numbers start showing up.Thanks+regards,Amir-
Re: How can I prove ....
Thanks Aljoscha,Thats why I am wondering about this. I dont see send/receive columns change at alljust 0's all the time.The only thing that changes is time stamp.Is this an indication that the nodes in the cluster are not participating in execution of the data?Thanks again.Amir- From: Aljoscha Krettek <aljos...@apache.org> To: amir bahmanyari <amirto...@yahoo.com>; User <user@flink.apache.org> Sent: Thursday, September 22, 2016 5:01 AM Subject: Re: How can I prove Hi,depending on the data source you might not be able to stress CPU/MEM because the source might be to slow. As long as you see the numbers increasing in the Flink Dashboard for all operators you should be good. Cheers,Aljoscha On Thu, 22 Sep 2016 at 00:26 amir bahmanyari <amirto...@yahoo.com> wrote: That all nodes in a Flink Cluster are involved simultaneously in processing the data?Programmatically, graphically...I need to stress CPU , MEM and all resources to their max.How can I guarantee this is happening in Flink Cluster?Out of 4 nodes, this is the highest resource usage I see from "top"...Everything else is not even close...top - 22:22:45 up 41 days, 2:39, 1 user, load average: 1.76, 1.55, 1.28Tasks: 344 total, 1 running, 343 sleeping, 0 stopped, 0 zombie%Cpu(s): 5.4 us, 1.0 sy, 0.0 ni, 93.5 id, 0.1 wa, 0.0 hi, 0.0 si, 0.0 stKiB Mem: 11551564+total, 65702020 used, 49813632 free, 115072 buffersKiB Swap: 0 total, 0 used, 0 free. 3148420 cached Mem I am pretty sure I can push FlinkRunner to way more extent than thisAnd thats where true realistic perf numbers start showing up.Thanks+regards,Amir-
How can I prove ....
That all nodes in a Flink Cluster are involved simultaneously in processing the data?Programmatically, graphically...I need to stress CPU , MEM and all resources to their max.How can I guarantee this is happening in Flink Cluster?Out of 4 nodes, this is the highest resource usage I see from "top"...Everything else is not even close...top - 22:22:45 up 41 days, 2:39, 1 user, load average: 1.76, 1.55, 1.28Tasks: 344 total, 1 running, 343 sleeping, 0 stopped, 0 zombie%Cpu(s): 5.4 us, 1.0 sy, 0.0 ni, 93.5 id, 0.1 wa, 0.0 hi, 0.0 si, 0.0 stKiB Mem: 11551564+total, 65702020 used, 49813632 free, 115072 buffersKiB Swap: 0 total, 0 used, 0 free. 3148420 cached Mem I am pretty sure I can push FlinkRunner to way more extent than thisAnd thats where true realistic perf numbers start showing up.Thanks+regards,Amir-
Re: Flink Cluster Load Distribution Question
Thanx Could you elaborate on writing to all partitions and not just one pls? How can I make sure ? I see all partitions consumed in the dashboard and they get listed when my Beam app starts and KafkaIO read operation gets associated to every single partition What else ? Thanks so much again Sent from my iPhone > On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > good to see that you're making progress! The number of partitions in the > Kafka topic should be >= the number of parallel Flink Slots and the > parallelism with which you start the program. You also have to make sure to > write to all partitions and not just to one. > > Cheers, > Aljoscha > >> On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <amirto...@yahoo.com> wrote: >> Hi Aljoscha, >> Thanks for your kind response. >> - We are really benchmarking Beam & its Runners and it happened that we >> started with Flink. >> therefore, any change we make to the approach must be a Beam code change >> that automatically affects the underlying runner. >> - I changed the TextIO() back to KafkaIO() reading from a Kafka cluster >> instead of a single node. Its behaving fine except that I am getting out of >> disk space by Kafka broker >> & am working around it as we speak. >> - I removed Redis as per your recommendation & replaced it with Java >> Concurrenthashmaps...Started to be a lot faster than before for sure. >> I cannot use a FLink specific solution for this. Must be either an external >> something or a Beam solution or just JVM solution. I picked >> Concurrenthashmaps for now. >> If I get by the Kafka broker disk space issue, and dont get an out of >> memory by the flink servers in 3 hrs of runtime, I should be starting seeing >> the light :)) >> Pls keep your fingers crossed as testing is underway for 10 express ways of >> linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs. >> - Kafka partitions in the kafka topic = total number of slots available in >> flink servers. Should I alter that for better performance? >> >> Thanks Aljoscha & have a great weekend. >> Amir- >> >> From: Aljoscha Krettek <aljos...@apache.org> >> To: Amir Bahmanyari <amirto...@yahoo.com>; user <user@flink.apache.org> >> Sent: Sunday, September 18, 2016 1:48 AM >> Subject: Re: Flink Cluster Load Distribution Question >> >> This is not related to Flink, but in Beam you can read from a directory >> containing many files using something like this (from MinimalWordCount.java >> in Beam): >> >> TextIO.Read.from("gs://apache-beam-samples/shakespeare/*") >> >> This will read all the files in the directory in parallel. >> >> For reading from Kafka I wrote this on another thread of yours: >> Are you sure that all your Kafka partitions contain data. Did you have a >> look at the Kafka metrics to see how the individual partitions are filled? >> If only one partition contains data, then you will only read data in one >> parallel instance of the sources. How are you writing your data to Kafka? >> >> Flink/Beam should read from all partitions if all of them contain data. >> Could you please verify that all Kafka partitions contain data by looking at >> the metrics of your Kafka cluster, that would be a first step towards >> finding out where the problem lies. >> >> By the way, your code uses Beam in a highly non-idiomatic way. Interacting >> with an outside database, such as Redis, will always be the bottleneck in >> such a job. Flink provides an abstraction for dealing with state that is >> vastly superior to using an external system. We recently did a blog post >> about rewriting a similar streaming use case using Flink's internal state: >> http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe >> that's interesting for you. >> >> Cheers, >> Aljoscha >> >> On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <amirto...@yahoo.com> wrote: >> Thanks so much Aljoscha >> Is there an example that shows how to read from multiple files accurately or >> from KafkaIO and get perfect parallelism pls? >> Have a great weekend >> >> Sent from my iPhone >> >>> On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> One observation here is that you're only reading from one file. This will >>> mean that you won't get any parallelism. Everything is executed on just one >>> task/thread. >>> >&g
Re: Fw: Flink Cluster Load Distribution Question
Hi Aljoscha,The JM logs is also attached. Seems like everything is ok, assigned...to all nodes...Not sure why I dont get performance? :-(Thanks+regards,Amir- From: Aljoscha Krettek <aljos...@apache.org> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Sent: Wednesday, September 14, 2016 1:48 AM Subject: Re: Fw: Flink Cluster Load Distribution Question Hi,this is a different job from the Kafka Job that you have running, right? Could you maybe post the code for that as well? Cheers,Aljoscha On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote: Hi Robert,Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.I have a Beam running FLink Runner underneath.The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.All nodes have identical flink-conf.yaml, masters & slaves contents as follows: flink-conf.yaml: jobmanager.rpc.address: node1 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 1024 taskmanager.heap.mb: 102400 taskmanager.numberOfTaskSlots: 16 taskmanager.memory.preallocate: false parallelism.default: 64 jobmanager.web.port: 8081 taskmanager.network.numberOfBuffers: 4096 masters: node1:8081 slaves:node1node2 node3 node4 Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.JM, TMs log files get generated on all nodes.Dashboard shows how all slots are being used.I deploy the Beam app to the cluster where JM is running at node1.a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.As we speak, I started the Beam app 13 hrs ago and its still running.How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?Do the above configurations look ok for a reasonable performance?Given above parameters set, how can I improve the performance in this cluster?What other information and or dashboard screen shots is needed to clarify this issue. I used these websites to do the configuration:Apache Flink: Cluster Setup | | | Apache Flink: Cluster Setup | | | Apache Flink: Configuration | | | Apache Flink: Configuration | | | In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box: - taskmanager.network.bufferSizeInBytes Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?Sorry too many questions.Pls let me know.I appreciate your help.Cheers,Amir- - Forwarded Message - From: Robert Metzger <rmetz...@apache.org> To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, September 13, 2016 1:15 AM Subject: Re: Flink Cluster Load Distribution Question Hi Amir, I would recommend to post such questions to the user@flink mailing list in the future. This list is meant for development-related topics. I think we need more details to understand why your application is not running properly. Can you quickly describe what your topology is doing? Are you setting the parallelism to a value >= 1 ? Regards, Robert On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari < amirto...@yahoo.com.invalid> wrote: > Hi Colleagues,Just joined this forum.I have done everything possible to > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always > generates system-output logs (*.out) in only one node. Its so slow > for 4 nodes being there.Seems like the load is not distributed amongst all > 4 nodes but only one node. Most of the time the one where JM runs.I > run/tested it in a single node, and it took even faster to run the same > load.Not sure whats not being configured right.1- why am I getting > SystemOut .out log in only one server? All nodes get their TaskManager log > files updated thu.2- why dont I see load being distributed amongst all 4 > nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero) > for Send/Receive numbers per all Task Managers. > The Dashboard shows all the right stuff. Top shows not much of resources > being stressed on any of the nodes.I can share its contents if it helps > diagnosing the issue.Thanks + I appreciate your valuable time, response & > help.Amir- flink-abahman-jobmanager-1-beam1.log Description: Binary data
Re: Fw: Flink Cluster Load Distribution Question
Hi Aljoscha,Thanks for your response. Its the same job but I am reading through TextIO() instead of a Kafka topic.I thought that would make a difference. It doesnt. Same slowness in Flink Cluster.I had sent you the code with reading from KafkaIO().Nothing different except commenting out the KafkaIO() & un-commenting TextIO().Its attached along with the Support class.Is there anything interesting you see in my configuration that may cause slowness and/or lack of the right distribution in the cluster as a whole?I also attached my config files in the JM node...same for other nodes.Have a wonderful day & thanks for your attention.Amir- From: Aljoscha Krettek <aljos...@apache.org> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> Sent: Wednesday, September 14, 2016 1:48 AM Subject: Re: Fw: Flink Cluster Load Distribution Question Hi,this is a different job from the Kafka Job that you have running, right? Could you maybe post the code for that as well? Cheers,Aljoscha On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote: Hi Robert,Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.I have a Beam running FLink Runner underneath.The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.All nodes have identical flink-conf.yaml, masters & slaves contents as follows: flink-conf.yaml: jobmanager.rpc.address: node1 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 1024 taskmanager.heap.mb: 102400 taskmanager.numberOfTaskSlots: 16 taskmanager.memory.preallocate: false parallelism.default: 64 jobmanager.web.port: 8081 taskmanager.network.numberOfBuffers: 4096 masters: node1:8081 slaves:node1node2 node3 node4 Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.JM, TMs log files get generated on all nodes.Dashboard shows how all slots are being used.I deploy the Beam app to the cluster where JM is running at node1.a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.As we speak, I started the Beam app 13 hrs ago and its still running.How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?Do the above configurations look ok for a reasonable performance?Given above parameters set, how can I improve the performance in this cluster?What other information and or dashboard screen shots is needed to clarify this issue. I used these websites to do the configuration:Apache Flink: Cluster Setup | | | Apache Flink: Cluster Setup | | | Apache Flink: Configuration | | | Apache Flink: Configuration | | | In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box: - taskmanager.network.bufferSizeInBytes Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?Sorry too many questions.Pls let me know.I appreciate your help.Cheers,Amir- - Forwarded Message - From: Robert Metzger <rmetz...@apache.org> To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, September 13, 2016 1:15 AM Subject: Re: Flink Cluster Load Distribution Question Hi Amir, I would recommend to post such questions to the user@flink mailing list in the future. This list is meant for development-related topics. I think we need more details to understand why your application is not running properly. Can you quickly describe what your topology is doing? Are you setting the parallelism to a value >= 1 ? Regards, Robert On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari < amirto...@yahoo.com.invalid> wrote: > Hi Colleagues,Just joined this forum.I have done everything possible to > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always > generates system-output logs (*.out) in only one node. Its so slow > for 4 nodes being there.Seems like the load is not distributed amongst all > 4 nodes but only one node. Most of the time the one where JM runs.I > run/tested it in a single node, and it took even faster to run the same > load.Not sure whats not being configured right.1- why am I getting > SystemOut .out log in only one server? All nodes get their TaskManager log > files updated thu.2- why dont I see load being distributed amongst all 4 > nodes, but only one all the
Fw: Flink Cluster Load Distribution Question
Hi Robert,Sure, I am forwarding it to user. Sorry about that. I followed the "robot's" instructions :))Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them node1, 2, 3, 4.Flink Clustered with node1 running JM & a TM. Three more TM's running on node2,3, and 4 respectively.I have a Beam running FLink Runner underneath.The input data is received by Beam TextIO() reading off a 1.6 GB of data containing roughly 22 million tuples.All nodes have identical flink-conf.yaml, masters & slaves contents as follows: flink-conf.yaml: jobmanager.rpc.address: node1 jobmanager.rpc.port: 6123 jobmanager.heap.mb: 1024 taskmanager.heap.mb: 102400 taskmanager.numberOfTaskSlots: 16 taskmanager.memory.preallocate: false parallelism.default: 64 jobmanager.web.port: 8081 taskmanager.network.numberOfBuffers: 4096 masters: node1:8081 slaves:node1node2 node3 node4 Everything looks normal at ./start-cluster.sh & all daemons start on all nodes.JM, TMs log files get generated on all nodes.Dashboard shows how all slots are being used.I deploy the Beam app to the cluster where JM is running at node1.a *.out file gets generated as data is being processed. No *.out on other nodes, just node1 where I deployed the fat jar.I tail -f the *.out log on node1 (master). starts fine...but slowly degrades & becomes extremely slow.As we speak, I started the Beam app 13 hrs ago and its still running.How can I prove that ALL NODES are involved in processing the data at the same time i.e. clustered?Do the above configurations look ok for a reasonable performance?Given above parameters set, how can I improve the performance in this cluster?What other information and or dashboard screen shots is needed to clarify this issue. I used these websites to do the configuration:Apache Flink: Cluster Setup | | | Apache Flink: Cluster Setup | | | Apache Flink: Configuration | | | Apache Flink: Configuration | | | In the second link, there is a config recommendation for the following but this parameter is not in the configuration file out of the box: - taskmanager.network.bufferSizeInBytes Should I include it manually? Does it make any difference if the default value i.e.32 KB doesn't get picked up?Sorry too many questions.Pls let me know.I appreciate your help.Cheers,Amir- - Forwarded Message - From: Robert Metzger <rmetz...@apache.org> To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari <amirto...@yahoo.com> Sent: Tuesday, September 13, 2016 1:15 AM Subject: Re: Flink Cluster Load Distribution Question Hi Amir, I would recommend to post such questions to the user@flink mailing list in the future. This list is meant for development-related topics. I think we need more details to understand why your application is not running properly. Can you quickly describe what your topology is doing? Are you setting the parallelism to a value >= 1 ? Regards, Robert On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari < amirto...@yahoo.com.invalid> wrote: > Hi Colleagues,Just joined this forum.I have done everything possible to > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always > generates system-output logs (*.out) in only one node. Its so slow > for 4 nodes being there.Seems like the load is not distributed amongst all > 4 nodes but only one node. Most of the time the one where JM runs.I > run/tested it in a single node, and it took even faster to run the same > load.Not sure whats not being configured right.1- why am I getting > SystemOut .out log in only one server? All nodes get their TaskManager log > files updated thu.2- why dont I see load being distributed amongst all 4 > nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero) > for Send/Receive numbers per all Task Managers. > The Dashboard shows all the right stuff. Top shows not much of resources > being stressed on any of the nodes.I can share its contents if it helps > diagnosing the issue.Thanks + I appreciate your valuable time, response & > help.Amir-