Just out of curiosity, what machines are you running this on? I'm asking because the number of task slots should roughly correlate with the number of CPU cores.
On Fri, 4 Nov 2016 at 10:34 Aljoscha Krettek <[email protected]> wrote: > You should try and find out why everything is just happening on one node. > Have you looked at your Kafka Partitions, i.e. is the data evenly > distributed across the partitions of your Kafka topic or is all data pushed > to one partition? This would actually explain why processing is only > happening one one node, namely the node that is reading the partition that > has all the data. > > On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <[email protected]> wrote: > > Thanks Aljoscha. > I have been tuning Flink memory, NW buffers etc. > And this occurred in THAT ONE NODE that I see *.out logs get created by > Flink. > I lowered the memory that Flink allocates i.e. 70% by default to 50%. > And this exception was thrown in that one node only. Other nodes were up & > didnt crash. > There is SOMETHING different about THAT ONE NODE :-) I cannot figure it > out. > At every ./start-cluster, THAT ONE NODE may/may not change on random basis. > So I cannt just tune THAT ONE NODE. Next time, another node may become > THAT ONE NODE. > > I have the followings set in flink-conf.yaml in each node: > > akka.ask.timeout : 300s > jobmanager.heap.mb: 256 //Could this be too small? > taskmanager.heap.mb: 102400 > taskmanager.memory.fraction: 0.6 //Changing this to a lower value causes > the exception below. Am testing with 0.6 <0.7 default. > taskmanager.numberOfTaskSlots: 512 > taskmanager.memory.preallocate: false > parallelism.default: 2048 > taskmanager.network.numberOfBuffers: 131072 > > > Appreciate any feedback. > Amir- > ------------------------------ > *From:* Aljoscha Krettek <[email protected]> > *To:* amir bahmanyari <[email protected]>; " > [email protected]" <[email protected]> > *Sent:* Thursday, November 3, 2016 12:45 AM > *Subject:* Re: What does this exception mean to you? > > That looks like a Flink problem. The TaskManager on beam4 seems to have > crashed for some reason. You might be able to find that reason by looking > at the logs on that machine. > > On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <[email protected]> wrote: > > Thanks+regards, > beam4 and beam1 are hostnames. > BenchBeamRunners.java is my Beam app running in a four servers > FlinkCluster. > Other nodes are still running except the one that failed beam4. > beam1 has the JM running. > > Amir- > > java.lang.RuntimeException: Pipeline execution failed > at > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) > at > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183) > at > benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.Client.runBlocking(Client.java:381) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:355) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118) > at > org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110) > ... 14 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: The slot in which the task was executed > has been released. Probably loss of TaskManager > 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4 - 512 slots - URL: akka.tcp:// > [email protected]:44399/user/taskmanager > at > org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) > at > org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547) > at > org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) > at > org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) > at > org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44) > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at > akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) > at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) > at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) > at akka.actor.ActorCell.invoke(ActorCell.scala:486) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > ... 2 more > > > >
