Hi Ajoscha,Thanks for your reply.I am using Microsoft Azure VM A11 for a 4
nodes Beam/Flink cluster & Dxy for Ingestion i.e. Kafka 2 nodes cluster given
below.
After I create the partition on Kafka, the describe option show that its is
evenly distributed between the two Kafka nodes.Or, at least this is my
understanding. Below is partial output and it shows that all 2048 partitions
are in use.Thanks so much for your help. I hope we can raw some conclusion out
of this and find the bottleneck.Have a great weekend.Amir-
[aba@kafka01 kafka_2.11-0.10.0.1]$ ./bin/kafka-topics.sh --describe --zookeeper
kafka01:2181 --topic linroad3 |moreTopic:linroad3 PartitionCount:2048
ReplicationFactor:2 Configs: Topic: linroad3 Partition: 0 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 1 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 2 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 3 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 4 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 5 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 6 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 7 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 8 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 9 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 10 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 11 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 12 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 13 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 14 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 15 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 16 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 17 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 18 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 19 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 20 Leader:
1 Replicas: 1,2 Isr: 1,2 Topic: linroad3 Partition: 21 Leader:
2 Replicas: 2,1 Isr: 2,1 Topic: linroad3 Partition: 22 Leader:
1 Replicas: 1,2 Isr:
1,2.................................................................................................
Topic: linroad3 Partition: 2045 Leader: 2 Replicas: 2,1 Isr:
2,1 Topic: linroad3 Partition: 2046 Leader: 1 Replicas: 1,2 Isr:
1,2 Topic: linroad3 Partition: 2047 Leader: 2 Replicas: 2,1 Isr:
2,1
From: Aljoscha Krettek <[email protected]>
To: amir bahmanyari <[email protected]>; "[email protected]"
<[email protected]>
Sent: Friday, November 4, 2016 10:44 AM
Subject: Re: What does this exception mean to you?
Just out of curiosity, what machines are you running this on? I'm asking
because the number of task slots should roughly correlate with the number of
CPU cores.
On Fri, 4 Nov 2016 at 10:34 Aljoscha Krettek <[email protected]> wrote:
You should try and find out why everything is just happening on one node. Have
you looked at your Kafka Partitions, i.e. is the data evenly distributed across
the partitions of your Kafka topic or is all data pushed to one partition? This
would actually explain why processing is only happening one one node, namely
the node that is reading the partition that has all the data.
On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <[email protected]> wrote:
Thanks Aljoscha.I have been tuning Flink memory, NW buffers etc.And this
occurred in THAT ONE NODE that I see *.out logs get created by Flink.I lowered
the memory that Flink allocates i.e. 70% by default to 50%. And this exception
was thrown in that one node only. Other nodes were up & didnt crash.There is
SOMETHING different about THAT ONE NODE :-) I cannot figure it out.At every
./start-cluster, THAT ONE NODE may/may not change on random basis.So I cannt
just tune THAT ONE NODE. Next time, another node may become THAT ONE NODE.
I have the followings set in flink-conf.yaml in each node:
akka.ask.timeout : 300s
jobmanager.heap.mb: 256 //Could this be too small? taskmanager.heap.mb:
102400taskmanager.memory.fraction: 0.6 //Changing this to a lower value causes
the exception below. Am testing with 0.6 <0.7 default.
taskmanager.numberOfTaskSlots: 512
taskmanager.memory.preallocate: false
parallelism.default: 2048
taskmanager.network.numberOfBuffers: 131072
Appreciate any feedback.Amir-
From: Aljoscha Krettek <[email protected]>
To: amir bahmanyari <[email protected]>; "[email protected]"
<[email protected]>
Sent: Thursday, November 3, 2016 12:45 AM
Subject: Re: What does this exception mean to you?
That looks like a Flink problem. The TaskManager on beam4 seems to have crashed
for some reason. You might be able to find that reason by looking at the logs
on that machine.
On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <[email protected]> wrote:
Thanks+regards,beam4 and beam1 are hostnames.BenchBeamRunners.java is my Beam
app running in a four servers FlinkCluster.
Other nodes are still running except the one that failed beam4.beam1 has the JM
running.
Amir-
java.lang.RuntimeException: Pipeline execution failed at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113) at
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48) at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:183) at
benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused
by: org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed. at
org.apache.flink.client.program.Client.runBlocking(Client.java:381) at
org.apache.flink.client.program.Client.runBlocking(Client.java:355) at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)
... 14 moreCaused by:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
by: java.lang.Exception: The slot in which the task was executed has been
released. Probably loss of TaskManager 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4
- 512 slots - URL: akka.tcp://[email protected]:44399/user/taskmanager at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at
akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at
akka.actor.ActorCell.invoke(ActorCell.scala:486) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at
akka.dispatch.Mailbox.run(Mailbox.scala:221) at
akka.dispatch.Mailbox.exec(Mailbox.scala:231) at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
... 2 more