Thanks Aljoscha,Sure, will try 16 slots per node. So the deg of parallelism 
should be 16x4=64,  i assume.Question: Why slots per node = 16 may perform 
better than 512?Doesn't the higher deg of parallelism imply higher app 
throughput?
Its still a mystery why System.println() goes to only one node *.out files . 
And whenever there is a runtime issue, THAT NODE with *.out crashes and 
everything else is still up & running.My underestanding of "cluster" is we 
should get app output in all nodes in the cluster depending on whicg record is 
being processed in which node.
The data is sent to Kafka01 node from a Java client, reading one record at a 
time from a data file to Kafka port number 9092 in node Kafka01 (master) and to 
the topic linroad3.Kafka01 node and Kafka02 node are clustered as indicated by 
the output of --describe below.
server.properties:listeners=PLAINTEXT://:9092
--zookeeper kafka01:2181

Pls let me know if this looks ok to you+ have a great weekend.
Thanks again.Amir-


      From: Aljoscha Krettek <[email protected]>
 To: amir bahmanyari <[email protected]>; "[email protected]" 
<[email protected]> 
 Sent: Friday, November 4, 2016 3:24 PM
 Subject: Re: What does this exception mean to you?
   
Ok, with those machines I highly recommend to set the slot count to 16, at most.
For Kafka, the fact that the partitions are distributed across the machines 
(which they seem to be) does not guarantee that the data that is written is 
written evenly to all partitions. How is the data written to Kafka?
On Fri, 4 Nov 2016 at 19:31 amir bahmanyari <[email protected]> wrote:

Hi AljoschaSorry. the Kafka VMs are A10 (not Dxy).IN case the images dont get 
there:A11: 16 cores, 112 GB, 16 Data disks, 16x500 Max IOPS, Load BalancingA10: 
8 cores, 56 GB, 16 Data disks, 16x500 Max IOPS, Load Balancing
Thanks.Amir-
      From: amir bahmanyari <[email protected]>
 To: "[email protected]" <[email protected]> 
 Sent: Friday, November 4, 2016 11:27 AM
 Subject: Re: What does this exception mean to you?
   
Hi Aljoscha,Thanks for your reply.I am using Microsoft Azure VM A11 for a 4 
nodes Beam/Flink cluster & A10 for Ingestion i.e. Kafka 2 nodes cluster given 
below.


After I create the partition on Kafka, the describe option show that its is 
evenly distributed between the two Kafka nodes.Or, at least this is my 
understanding. Below is partial output and it shows that all 2048 partitions 
are in use.Thanks so much for your help. I hope we can raw some conclusion out 
of this and find the bottleneck.Have a great weekend.Amir-
[aba@kafka01 kafka_2.11-0.10.0.1]$ ./bin/kafka-topics.sh --describe --zookeeper 
kafka01:2181 --topic linroad3 |moreTopic:linroad3  PartitionCount:2048     
ReplicationFactor:2     Configs:        Topic: linroad3 Partition: 0    Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 1    Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 2    Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 3    Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 4    Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 5    Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 6    Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 7    Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 8    Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 9    Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 10   Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 11   Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 12   Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 13   Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 14   Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 15   Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 16   Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 17   Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 18   Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 19   Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 20   Leader: 
1       Replicas: 1,2   Isr: 1,2        Topic: linroad3 Partition: 21   Leader: 
2       Replicas: 2,1   Isr: 2,1        Topic: linroad3 Partition: 22   Leader: 
1       Replicas: 1,2   Isr: 
1,2.................................................................................................
        Topic: linroad3 Partition: 2045 Leader: 2       Replicas: 2,1   Isr: 
2,1        Topic: linroad3 Partition: 2046 Leader: 1       Replicas: 1,2   Isr: 
1,2        Topic: linroad3 Partition: 2047 Leader: 2       Replicas: 2,1   Isr: 
2,1


      From: Aljoscha Krettek <[email protected]>
 To: amir bahmanyari <[email protected]>; "[email protected]" 
<[email protected]> 
 Sent: Friday, November 4, 2016 10:44 AM
 Subject: Re: What does this exception mean to you?
  
Just out of curiosity, what machines are you running this on? I'm asking 
because the number of task slots should roughly correlate with the number of 
CPU cores.
On Fri, 4 Nov 2016 at 10:34 Aljoscha Krettek <[email protected]> wrote:

You should try and find out why everything is just happening on one node. Have 
you looked at your Kafka Partitions, i.e. is the data evenly distributed across 
the partitions of your Kafka topic or is all data pushed to one partition? This 
would actually explain why processing is only happening one one node, namely 
the node that is reading the partition that has all the data.
On Thu, 3 Nov 2016 at 20:53 amir bahmanyari <[email protected]> wrote:

Thanks Aljoscha.I have been tuning Flink memory, NW buffers etc.And this 
occurred in THAT ONE NODE that I see *.out logs get created by Flink.I lowered 
the memory that Flink allocates i.e. 70% by default to 50%. And this exception 
was thrown in that one node only. Other nodes were up & didnt crash.There is 
SOMETHING different about THAT ONE NODE :-) I cannot figure it out.At every 
./start-cluster, THAT ONE NODE may/may not change on random basis.So I cannt 
just tune THAT ONE NODE. Next time, another node may become THAT ONE NODE.
I have the followings set in flink-conf.yaml in each node:
akka.ask.timeout : 300s
jobmanager.heap.mb: 256 //Could this be too small? taskmanager.heap.mb: 
102400taskmanager.memory.fraction: 0.6 //Changing this to a lower value causes 
the exception below. Am testing with 0.6 <0.7 default.
taskmanager.numberOfTaskSlots: 512
taskmanager.memory.preallocate: false
parallelism.default: 2048
taskmanager.network.numberOfBuffers: 131072


Appreciate any feedback.Amir-
    From: Aljoscha Krettek <[email protected]>
 To: amir bahmanyari <[email protected]>; "[email protected]" 
<[email protected]> 
 Sent: Thursday, November 3, 2016 12:45 AM
 Subject: Re: What does this exception mean to you?
  
That looks like a Flink problem. The TaskManager on beam4 seems to have crashed 
for some reason. You might be able to find that reason by looking at the logs 
on that machine.
On Thu, 3 Nov 2016 at 04:53 amir bahmanyari <[email protected]> wrote:

Thanks+regards,beam4 and beam1 are hostnames.BenchBeamRunners.java is my Beam 
app running in a four servers FlinkCluster.
Other nodes are still running except the one that failed beam4.beam1 has the JM 
running.
Amir-
java.lang.RuntimeException: Pipeline execution failed        at 
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)        at 
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)        at 
org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)        at 
benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622)     
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)  
      at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)   
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused 
by: org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Job execution failed.        at 
org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at 
org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)  
      ... 14 moreCaused by: 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.    
    at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:714)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)     
   at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
       at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)Caused
 by: java.lang.Exception: The slot in which the task was executed has been 
released. Probably loss of TaskManager 06dff71ba6ab965ec323c8ee6bf3d7d1 @ beam4 
- 512 slots - URL: akka.tcp://[email protected]:44399/user/taskmanager        at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151)   
     at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
        at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)   
     at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)  
      at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
        at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)   
     at 
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
        at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)   
     at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)        
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)     
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)        
at 
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)  
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)     
   at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)        at 
akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)        at 
akka.actor.ActorCell.invoke(ActorCell.scala:486)        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at 
akka.dispatch.Mailbox.run(Mailbox.scala:221)        at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        ... 2 more



 



   


   

Reply via email to