Ok, with those machines I highly recommend to set the slot count to 16, at
most.

For Kafka, the fact that the partitions are distributed across the machines
(which they seem to be) does not guarantee that the data that is written is
written evenly to all partitions. How is the data written to Kafka?

On Fri, 4 Nov 2016 at 19:31 amir bahmanyari <[email protected]> wrote:

> Hi Aljoscha
> Sorry. the Kafka VMs are A10 (not Dxy).
> IN case the images dont get there:
> A11: 16 cores, 112 GB, 16 Data disks, 16x500 Max IOPS, Load Balancing
> A10: 8 cores, 56 GB, 16 Data disks, 16x500 Max IOPS, Load Balancing
>
> Thanks.
> Amir-
>
> ------------------------------
> *From:* amir bahmanyari <[email protected]>
> *To:* "[email protected]" <[email protected]>
> *Sent:* Friday, November 4, 2016 11:27 AM
>
> *Subject:* Re: What does this exception mean to you?
>
> Hi Aljoscha,
> Thanks for your reply.
> I am using Microsoft Azure VM A11 for a 4 nodes Beam/Flink cluster & A10
> for Ingestion i.e. Kafka 2 nodes cluster given below.
> [image: Inline image]
> [image: Inline image]
>
> After I create the partition on Kafka, the describe option show that its
> is evenly distributed between the two Kafka nodes.
> Or, at least this is my understanding. Below is partial output and it
> shows that all 2048 partitions are in use.
> Thanks so much for your help. I hope we can raw some conclusion out of
> this and find the bottleneck.
> Have a great weekend.
> Amir-
>
> [aba@kafka01 kafka_2.11-0.10.0.1]$ ./bin/kafka-topics.sh --describe
> --zookeeper kafka01:2181 --topic linroad3 |more
> Topic:linroad3  PartitionCount:2048     ReplicationFactor:2     Configs:
>         Topic: linroad3 Partition: *0*    *Leader: 1 *      *Replicas:
> 1,2*   *Isr: 1,2*
>         Topic: linroad3 Partition: 1    *Leader: 2 *      *Replicas: 2,1*
>   *Isr: 2,1*
>         Topic: linroad3 Partition: 2    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 3    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 4    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 5    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 6    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 7    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 8    Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 9    Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 10   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 11   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 12   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 13   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 14   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 15   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 16   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 17   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 18   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 19   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 20   Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: 21   Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 22   Leader: 1       Replicas: 1,2
> Isr: 1,2
>
> .................................................................................................
>         Topic: linroad3 Partition: 2045 Leader: 2       Replicas: 2,1
> Isr: 2,1
>         Topic: linroad3 Partition: 2046 Leader: 1       Replicas: 1,2
> Isr: 1,2
>         Topic: linroad3 Partition: *2047 *Leader: 2       Replicas: 2,1
> Isr: 2,1
>
>
>
> ------------------------------
> *From:* Aljoscha Krettek <[email protected]>
> *To:* amir bahmanyari <[email protected]>; "
> [email protected]" <[email protected]>
> *Sent:* Friday, November 4, 2016 10:44 AM
> *Subject:* Re: What does this exception mean to you?
>
> Just out of curiosity, what machines are you running this on? I'm asking
> because the number of task slots should roughly correlate with the number
> of CPU cores.
>
> On Fri, 4 Nov 2016 at 10:34 Aljoscha Krettek <[email protected]> wrote:
>
> You should try and find out why everything is just happening on one node.
> Have you looked at your Kafka Partitions, i.e. is the data evenly
> distributed across the partitions of your Kafka topic or is all data pushed
> to one partition? This would actually explain why processing is only
> happening one one node, namely the node that is reading the partition that
> has all the data.
>
> On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <[email protected]> wrote:
>
> Thanks Aljoscha.
> I have been tuning Flink memory, NW buffers etc.
> And this occurred in THAT ONE NODE that I see *.out logs get created by
> Flink.
> I lowered the memory that Flink allocates i.e. 70% by default to 50%.
> And this exception was thrown in that one node only. Other nodes were up &
> didnt crash.
> There is SOMETHING different about THAT ONE NODE :-) I cannot figure it
> out.
> At every ./start-cluster, THAT ONE NODE may/may not change on random basis.
> So I cannt just tune THAT ONE NODE. Next time, another node may become
> THAT ONE NODE.
>
> I have the followings set in flink-conf.yaml in each node:
>
> akka.ask.timeout : 300s
> jobmanager.heap.mb: 256 //Could this be too small?
> taskmanager.heap.mb: 102400
> taskmanager.memory.fraction: 0.6 //Changing this to a lower value causes
> the exception below. Am testing with 0.6 <0.7 default.
> taskmanager.numberOfTaskSlots: 512
> taskmanager.memory.preallocate: false
> parallelism.default: 2048
> taskmanager.network.numberOfBuffers: 131072
>
>
> Appreciate any feedback.
> Amir-
> ------------------------------
> *From:* Aljoscha Krettek <[email protected]>
> *To:* amir bahmanyari <[email protected]>; "
> [email protected]" <[email protected]>
> *Sent:* Thursday, November 3, 2016 12:45 AM
> *Subject:* Re: What does this exception mean to you?
>
> That looks like a Flink problem. The TaskManager on beam4 seems to have
> crashed for some reason. You might be able to find that reason by looking
> at the logs on that machine.
>
> On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <[email protected]> wrote:
>
> Thanks+regards,
> beam4 and beam1 are hostnames.
> BenchBeamRunners.java is my Beam app running in a four servers
> FlinkCluster.
> Other nodes are still running except the one that failed beam4.
> beam1 has the JM running.
>
> Amir-
>
> java.lang.RuntimeException: Pipeline execution failed
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)
>         at
> benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>         at
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>         at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> program execution failed: Job execution failed.
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
>         at
> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
>         at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
>         at
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118)
>         at
> org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)
>         ... 14 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.Exception: The slot in which the task was executed
> has been released. Probably loss of TaskManager
> 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4 - 512 slots - URL: akka.tcp://
> [email protected]:44399/user/taskmanager
>         at
> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)
>         at
> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
>         at
> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
>         at
> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
>         at
> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
>         at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>         at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at
> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>         at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
>         at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:486)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         ... 2 more
>
>
>
>
>
>

Reply via email to