Re: Flink survey by data Artisans

2016-11-18 Thread amir bahmanyari



  From: Kostas Tzoumas 
 To: "d...@flink.apache.org" ; user@flink.apache.org 
 Sent: Friday, November 18, 2016 7:28 AM
 Subject: Flink survey by data Artisans
   
Hi everyone!

The Apache Flink community has evolved quickly over the past 2+ years, and
there are now many production Flink deployments in organizations of all
sizes.  This is both exciting and humbling :-)

data Artisans is running a brief survey to understand Apache Flink usage
and the needs of the community. We are hoping that this survey will help
identify common usage patterns, as well as pinpoint what are the most
needed features for Flink.

We'll share a report with a summary of findings at the conclusion of the
survey with the community. All of the responses will remain confidential,
and only aggregate statistics will be shared.

I expect the survey to take 5-10 minutes, and all questions are
optional--we appreciate any feedback that you're willing to provide.

As a thank you, respondents will be entered in a drawing to win one of 10
tickets to Flink Forward 2017 (your choice of Berlin or the first-ever San
Francisco edition).

The survey is available here: http://www.surveygizmo.com/s3/
3166399/181bdb611f22

Looking forward to hearing back from you!

Best,
Kostas


   

Re: Why did the Flink Cluster JM crash?

2016-11-10 Thread amir bahmanyari
Hi Till.I just checked and my test finished after 19 hours with the config 
below.The expected Linear Road test time is 3.5 hours.I have achieved this for 
1/2 data I sent yesterday.But for 105 G worth of tuples I get 19 hours.No 
exceptions, no errors. Clean. But almost 5 times slower than expected.Thanks 
again.


  From: amir bahmanyari <amirto...@yahoo.com>
 To: Till Rohrmann <trohrm...@apache.org> 
Cc: "user@flink.apache.org" <user@flink.apache.org>
 Sent: Thursday, November 10, 2016 9:35 AM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Thanks Till.I did all of that with one difference.I have only 1 topic with 64 
partitions correlating to the total number of slots in all Flink servers.Can 
you elaborate on "As long as you have more Kafka topics than Flink Kafka 
consumers (subtasks) " pls?Perhaps thats the bottleneck in my config and object 
creation.I send data to 1 topic across a 2 nodes Kafka cluster with 64 
partitions.And KafkaIo() in Beam app is set to receive from it.How can "more 
Kafka topics" translate to KafkaIo() settings in Beam API?Thanks+regardsAmir-
  From: Till Rohrmann <trohrm...@apache.org>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: "user@flink.apache.org" <user@flink.apache.org>
 Sent: Thursday, November 10, 2016 2:13 AM
 Subject: Re: Why did the Flink Cluster JM crash?
  
The amount of data should be fine. Try to set the number of slots to the number 
of cores you have available.
As long as you have more Kafka topics than Flink Kafka consumers (subtasks) you 
should be fine. But I think you can also decrease the number of Kafka 
partitions a little bit. I guess that an extensive number of partitions also 
comes with a price. But I'm no expert there.
Hope your experiments run well with these settings.
Cheers,Till
On Wed, Nov 9, 2016 at 8:02 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Thanks Till.I have been trying out many many configuration combinations to get 
to the peak of what I can get as a reasonable performance.And yes, when I drop 
the number of slots, I dont get OOM. However, I dont get the response I want 
either.The amount of data I send is kinda huge; about 105 G that's sent in an 
stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 
2 nodes cluster of Kafka.From what I understand, you are suggesting that to get 
the best performance, the total number of slots should be equal to the total 
number of cores distributed in the cluster.For the sake of making sure we have 
done that, I would go back and repeat the testing with that in mind.Fyi, the 
Kafka partitions are 4096. Roughly, 1024 per 16 cores per one node. Is this 
reasonable?Once I know the answer to this question, I will go ahead and 
readjust my config and repeat the test.I appreciate your response.Amir-

  From: Till Rohrmann <till.rohrm...@gmail.com>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: "user@flink.apache.org" <user@flink.apache.org>
 Sent: Wednesday, November 9, 2016 1:27 AM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Hi Amir,
I fear that 900 slots per task manager is a bit too many unless your machine 
has 900 cores. As a rule of thumb you should allocate as many slots as your 
machines have cores. Maybe you could try to decrease the number of slots and 
see if you still observe an OOM error.
Cheers,Till
On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirto...@yahoo.com> wrote:

Ok. There is an OOM exception...but this used to work fine with the same 
configurations.There are four nodes: beam1 through 4.The Kafka partitions are 
4096 > 3584 deg of parallelism.
jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 
1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots:  896 
taskmanager.memory. preallocate: false
parallelism.default: 3584

Thanks for your valuable time Till.
AnonymousParDo -> AnonymousParDo (3584/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 
88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO  
org.apache.flink.runtime. executiongraph.ExecutionGraph        - Source: 
Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) ( 
ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED2016-11-08 
22:51:44,474 INFO  org.apache.flink.runtime. executiongraph.ExecutionGraph      
  - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo 
(1/3584) ( 865c54432153a0230e62bf7610118f f8) switched from RUNNING to 
CANCELING2016-11-08 22:51:44,474 INFO  org.apache.flink.runtime. 
jobmanager.JobManager                - Status of job 
e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) 
changed to FAILING.java.lang.OutOfMemoryError: unable to create new native 
thread at java.lang.Thread.start0(Native Method) at 
java.lang.Thread.start(Thread. java:714) at java.util.concurrent. 
ThreadPoolE

Re: Why did the Flink Cluster JM crash?

2016-11-10 Thread amir bahmanyari
Thanks Till.I did all of that with one difference.I have only 1 topic with 64 
partitions correlating to the total number of slots in all Flink servers.Can 
you elaborate on "As long as you have more Kafka topics than Flink Kafka 
consumers (subtasks) " pls?Perhaps thats the bottleneck in my config and object 
creation.I send data to 1 topic across a 2 nodes Kafka cluster with 64 
partitions.And KafkaIo() in Beam app is set to receive from it.How can "more 
Kafka topics" translate to KafkaIo() settings in Beam API?Thanks+regardsAmir-
  From: Till Rohrmann <trohrm...@apache.org>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: "user@flink.apache.org" <user@flink.apache.org>
 Sent: Thursday, November 10, 2016 2:13 AM
 Subject: Re: Why did the Flink Cluster JM crash?
   
The amount of data should be fine. Try to set the number of slots to the number 
of cores you have available.
As long as you have more Kafka topics than Flink Kafka consumers (subtasks) you 
should be fine. But I think you can also decrease the number of Kafka 
partitions a little bit. I guess that an extensive number of partitions also 
comes with a price. But I'm no expert there.
Hope your experiments run well with these settings.
Cheers,Till
On Wed, Nov 9, 2016 at 8:02 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Thanks Till.I have been trying out many many configuration combinations to get 
to the peak of what I can get as a reasonable performance.And yes, when I drop 
the number of slots, I dont get OOM. However, I dont get the response I want 
either.The amount of data I send is kinda huge; about 105 G that's sent in an 
stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 
2 nodes cluster of Kafka.From what I understand, you are suggesting that to get 
the best performance, the total number of slots should be equal to the total 
number of cores distributed in the cluster.For the sake of making sure we have 
done that, I would go back and repeat the testing with that in mind.Fyi, the 
Kafka partitions are 4096. Roughly, 1024 per 16 cores per one node. Is this 
reasonable?Once I know the answer to this question, I will go ahead and 
readjust my config and repeat the test.I appreciate your response.Amir-

      From: Till Rohrmann <till.rohrm...@gmail.com>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: "user@flink.apache.org" <user@flink.apache.org>
 Sent: Wednesday, November 9, 2016 1:27 AM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Hi Amir,
I fear that 900 slots per task manager is a bit too many unless your machine 
has 900 cores. As a rule of thumb you should allocate as many slots as your 
machines have cores. Maybe you could try to decrease the number of slots and 
see if you still observe an OOM error.
Cheers,Till
On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirto...@yahoo.com> wrote:

Ok. There is an OOM exception...but this used to work fine with the same 
configurations.There are four nodes: beam1 through 4.The Kafka partitions are 
4096 > 3584 deg of parallelism.
jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 
1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots:  896 
taskmanager.memory. preallocate: false
parallelism.default: 3584

Thanks for your valuable time Till.
AnonymousParDo -> AnonymousParDo (3584/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 
88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO  
org.apache.flink.runtime. executiongraph.ExecutionGraph        - Source: 
Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) ( 
ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED2016-11-08 
22:51:44,474 INFO  org.apache.flink.runtime. executiongraph.ExecutionGraph      
  - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo 
(1/3584) ( 865c54432153a0230e62bf7610118f f8) switched from RUNNING to 
CANCELING2016-11-08 22:51:44,474 INFO  org.apache.flink.runtime. 
jobmanager.JobManager                - Status of job 
e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) 
changed to FAILING.java.lang.OutOfMemoryError: unable to create new native 
thread at java.lang.Thread.start0(Native Method) at 
java.lang.Thread.start(Thread. java:714) at java.util.concurrent. 
ThreadPoolExecutor.addWorker( ThreadPoolExecutor.java:950) at 
java.util.concurrent. ThreadPoolExecutor. ensurePrestart( 
ThreadPoolExecutor.java:1587) at java.util.concurrent. 
ScheduledThreadPoolExecutor. delayedExecute( ScheduledThreadPoolExecutor. 
java:334) at java.util.concurrent. ScheduledThreadPoolExecutor. schedule( 
ScheduledThreadPoolExecutor. java:533) at java.util.concurrent. Executors$ 
DelegatedScheduledExecutorServ ice.schedule(Executors.java: 729) at 
org.apache.flink.streaming. runtime.tasks.StreamTask. 
registerTimer(StreamTask.java: 652) at org.apache

Re: Why did the Flink Cluster JM crash?

2016-11-09 Thread amir bahmanyari
Thanks Till.I have been trying out many many configuration combinations to get 
to the peak of what I can get as a reasonable performance.And yes, when I drop 
the number of slots, I dont get OOM. However, I dont get the response I want 
either.The amount of data I send is kinda huge; about 105 G that's sent in an 
stretch of 3.5 hours to a 4 nodes cluster running my Beam app receiving from a 
2 nodes cluster of Kafka.From what I understand, you are suggesting that to get 
the best performance, the total number of slots should be equal to the total 
number of cores distributed in the cluster.For the sake of making sure we have 
done that, I would go back and repeat the testing with that in mind.Fyi, the 
Kafka partitions are 4096. Roughly, 1024 per 16 cores per one node. Is this 
reasonable?Once I know the answer to this question, I will go ahead and 
readjust my config and repeat the test.I appreciate your response.Amir-

  From: Till Rohrmann <till.rohrm...@gmail.com>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: "user@flink.apache.org" <user@flink.apache.org>
 Sent: Wednesday, November 9, 2016 1:27 AM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Hi Amir,
I fear that 900 slots per task manager is a bit too many unless your machine 
has 900 cores. As a rule of thumb you should allocate as many slots as your 
machines have cores. Maybe you could try to decrease the number of slots and 
see if you still observe an OOM error.
Cheers,Till
On Wed, Nov 9, 2016 at 12:10 AM, amir bahmanyari <amirto...@yahoo.com> wrote:

Ok. There is an OOM exception...but this used to work fine with the same 
configurations.There are four nodes: beam1 through 4.The Kafka partitions are 
4096 > 3584 deg of parallelism.
jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 
1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots:  896 
taskmanager.memory. preallocate: false
parallelism.default: 3584

Thanks for your valuable time Till.
AnonymousParDo -> AnonymousParDo (3584/3584) ( ebe8da5bda017ee31ad774c5bc5e5e 
88) switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO  
org.apache.flink.runtime. executiongraph.ExecutionGraph        - Source: 
Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) ( 
ddf5a8939c1fc4ad1e6d71f17fe5ab 0b) switched from RUNNING to FAILED2016-11-08 
22:51:44,474 INFO  org.apache.flink.runtime. executiongraph.ExecutionGraph      
  - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo 
(1/3584) ( 865c54432153a0230e62bf7610118f f8) switched from RUNNING to 
CANCELING2016-11-08 22:51:44,474 INFO  org.apache.flink.runtime. 
jobmanager.JobManager                - Status of job 
e61cada683c0f7a709101c26c2c9a1 7c (benchbeamrunners-abahman- 1108225128) 
changed to FAILING.java.lang.OutOfMemoryError: unable to create new native 
thread at java.lang.Thread.start0(Native Method) at 
java.lang.Thread.start(Thread. java:714) at java.util.concurrent. 
ThreadPoolExecutor.addWorker( ThreadPoolExecutor.java:950) at 
java.util.concurrent. ThreadPoolExecutor. ensurePrestart( 
ThreadPoolExecutor.java:1587) at java.util.concurrent. 
ScheduledThreadPoolExecutor. delayedExecute( ScheduledThreadPoolExecutor. 
java:334) at java.util.concurrent. ScheduledThreadPoolExecutor. schedule( 
ScheduledThreadPoolExecutor. java:533) at java.util.concurrent. Executors$ 
DelegatedScheduledExecutorServ ice.schedule(Executors.java: 729) at 
org.apache.flink.streaming. runtime.tasks.StreamTask. 
registerTimer(StreamTask.java: 652) at org.apache.flink.streaming. 
api.operators. AbstractStreamOperator. registerTimer( 
AbstractStreamOperator.java: 250) at org.apache.flink.streaming. api.operators. 
StreamingRuntimeContext. registerTimer( StreamingRuntimeContext.java: 92) at 
org.apache.beam.runners.flink. translation.wrappers.streaming.io. 
UnboundedSourceWrapper. setNextWatermarkTimer( UnboundedSourceWrapper.java: 
381) at org.apache.beam.runners.flink. translation.wrappers.streaming.io. 
UnboundedSourceWrapper.run( UnboundedSourceWrapper.java: 233) at 
org.apache.flink.streaming. api.operators.StreamSource. 
run(StreamSource.java:78) at org.apache.flink.streaming. runtime.tasks. 
SourceStreamTask.run( SourceStreamTask.java:56) at org.apache.flink.streaming. 
runtime.tasks.StreamTask. invoke(StreamTask.java:224) at 
org.apache.flink.runtime. taskmanager.Task.run(Task. java:559) at 
java.lang.Thread.run(Thread. java:745)


  From: Till Rohrmann <till.rohrm...@gmail.com>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
 Sent: Tuesday, November 8, 2016 2:11 PM
 Subject: Re: Why did the Flink Cluster JM crash?
  
Hi Amir,
what does the JM logs say?
Cheers,Till
On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Hi colleagues,I started the cluster all fine. Started the Beam app running in 
the Flink Cluster fine.Dashboard showed all tasks being 

Re: Why did the Flink Cluster JM crash?

2016-11-08 Thread amir bahmanyari
Ok. There is an OOM exception...but this used to work fine with the same 
configurations.There are four nodes: beam1 through 4.The Kafka partitions are 
4096 > 3584 deg of parallelism.
jobmanager.rpc.address: beam1jobmanager.rpc.port: 6123jobmanager.heap.mb: 
1024taskmanager.heap.mb: 102400taskmanager.numberOfTaskSlots:  896 
taskmanager.memory.preallocate: false
parallelism.default: 3584

Thanks for your valuable time Till.
AnonymousParDo -> AnonymousParDo (3584/3584) (ebe8da5bda017ee31ad774c5bc5e5e88) 
switched from DEPLOYING to RUNNING2016-11-08 22:51:44,471 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo (3573/3584) 
(ddf5a8939c1fc4ad1e6d71f17fe5ab0b) switched from RUNNING to FAILED2016-11-08 
22:51:44,474 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
 - Source: Read(UnboundedKafkaSource) -> AnonymousParDo -> AnonymousParDo 
(1/3584) (865c54432153a0230e62bf7610118ff8) switched from RUNNING to 
CANCELING2016-11-08 22:51:44,474 INFO  
org.apache.flink.runtime.jobmanager.JobManager                - Status of job 
e61cada683c0f7a709101c26c2c9a17c (benchbeamrunners-abahman-1108225128) changed 
to FAILING.java.lang.OutOfMemoryError: unable to create new native thread at 
java.lang.Thread.start0(Native Method) at 
java.lang.Thread.start(Thread.java:714) at 
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950) 
at 
java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1587)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:334)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
 at 
java.util.concurrent.Executors$DelegatedScheduledExecutorService.schedule(Executors.java:729)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.registerTimer(StreamTask.java:652)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.registerTimer(AbstractStreamOperator.java:250)
 at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.registerTimer(StreamingRuntimeContext.java:92)
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.setNextWatermarkTimer(UnboundedSourceWrapper.java:381)
 at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:233)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) 
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at 
java.lang.Thread.run(Thread.java:745)


  From: Till Rohrmann <till.rohrm...@gmail.com>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
 Sent: Tuesday, November 8, 2016 2:11 PM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Hi Amir,
what does the JM logs say?
Cheers,Till
On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Hi colleagues,I started the cluster all fine. Started the Beam app running in 
the Flink Cluster fine.Dashboard showed all tasks being consumed and open for 
business.I started sending data to the Beam app, and all of the sudden the 
Flink JM crashed.Exceptions below.Thanks+regardsAmir
java.lang.RuntimeException: Pipeline execution failed        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:113)        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:48)        at 
org.apache.beam.sdk.Pipeline. run(Pipeline.java:183)        at 
benchmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622)  
//p.run();        at sun.reflect. NativeMethodAccessorImpl. invoke0(Native 
Method)        at sun.reflect. NativeMethodAccessorImpl. invoke( 
NativeMethodAccessorImpl.java: 62)        at sun.reflect. 
DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43)    
    at java.lang.reflect.Method. invoke(Method.java:498)        at 
org.apache.flink.client. program.PackagedProgram. callMainMethod( 
PackagedProgram.java:505)        at org.apache.flink.client. 
program.PackagedProgram. invokeInteractiveModeForExecut 
ion(PackagedProgram.java:403)        at org.apache.flink.client. 
program.Client.runBlocking( Client.java:248)        at org.apache.flink.client. 
CliFrontend. executeProgramBlocking( CliFrontend.java:866)        at 
org.apache.flink.client. CliFrontend.run(CliFrontend. java:333)        at 
org.apache.flink.client. CliFrontend.parseParameters( CliFrontend.java:1189)    
    at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)Caused 
by: org.apache.flink.client. program. ProgramInvocationException: The program 
execution failed: Communication with JobManager failed

Re: Why did the Flink Cluster JM crash?

2016-11-08 Thread amir bahmanyari
OOps! sorry Till.I replicated it and I see exceptions in JM logs.How can I send 
the logs to you? or what "interesting" part of it do you need so I can 
copy/paste it here...Thanks


  From: Till Rohrmann <till.rohrm...@gmail.com>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
 Sent: Tuesday, November 8, 2016 2:11 PM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Hi Amir,
what does the JM logs say?
Cheers,Till
On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Hi colleagues,I started the cluster all fine. Started the Beam app running in 
the Flink Cluster fine.Dashboard showed all tasks being consumed and open for 
business.I started sending data to the Beam app, and all of the sudden the 
Flink JM crashed.Exceptions below.Thanks+regardsAmir
java.lang.RuntimeException: Pipeline execution failed        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:113)        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:48)        at 
org.apache.beam.sdk.Pipeline. run(Pipeline.java:183)        at 
benchmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622)  
//p.run();        at sun.reflect. NativeMethodAccessorImpl. invoke0(Native 
Method)        at sun.reflect. NativeMethodAccessorImpl. invoke( 
NativeMethodAccessorImpl.java: 62)        at sun.reflect. 
DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43)    
    at java.lang.reflect.Method. invoke(Method.java:498)        at 
org.apache.flink.client. program.PackagedProgram. callMainMethod( 
PackagedProgram.java:505)        at org.apache.flink.client. 
program.PackagedProgram. invokeInteractiveModeForExecut 
ion(PackagedProgram.java:403)        at org.apache.flink.client. 
program.Client.runBlocking( Client.java:248)        at org.apache.flink.client. 
CliFrontend. executeProgramBlocking( CliFrontend.java:866)        at 
org.apache.flink.client. CliFrontend.run(CliFrontend. java:333)        at 
org.apache.flink.client. CliFrontend.parseParameters( CliFrontend.java:1189)    
    at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)Caused 
by: org.apache.flink.client. program. ProgramInvocationException: The program 
execution failed: Communication with JobManager failed: Lost connection to the 
JobManager.        at org.apache.flink.client. program.Client.runBlocking( 
Client.java:381)        at org.apache.flink.client. program.Client.runBlocking( 
Client.java:355)        at org.apache.flink.streaming. api.environment. 
StreamContextEnvironment. execute( StreamContextEnvironment.java: 65)        at 
org.apache.beam.runners.flink. FlinkPipelineExecutionEnvironm 
ent.executePipeline( FlinkPipelineExecutionEnvironm ent.java:118)        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:110)        
... 14 moreCaused by: org.apache.flink.runtime. client.JobExecutionException: 
Communication with JobManager failed: Lost connection to the JobManager.        
at org.apache.flink.runtime. client.JobClient. submitJobAndWait(JobClient. 
java:140)        at org.apache.flink.client. program.Client.runBlocking( 
Client.java:379)        ... 18 moreCaused by: org.apache.flink.runtime. client. 
JobClientActorConnectionTimeou tException: Lost connection to the JobManager.   
     at org.apache.flink.runtime. client.JobClientActor. 
handleMessage(JobClientActor. java:244)        at 
org.apache.flink.runtime.akka. FlinkUntypedActor. handleLeaderSessionID( 
FlinkUntypedActor.java:88)        at org.apache.flink.runtime.akka. 
FlinkUntypedActor.onReceive( FlinkUntypedActor.java:68)        at 
akka.actor.UntypedActor$$ anonfun$receive$1.applyOrElse( 
UntypedActor.scala:167)        at akka.actor.Actor$class. 
aroundReceive(Actor.scala:465)        at akka.actor.UntypedActor. 
aroundReceive(UntypedActor. scala:97)        at akka.actor.ActorCell. 
receiveMessage(ActorCell. scala:516)        at akka.actor.ActorCell.invoke( 
ActorCell.scala:487)        at akka.dispatch.Mailbox. 
processMailbox(Mailbox.scala: 254)        at akka.dispatch.Mailbox.run( 
Mailbox.scala:221)        at akka.dispatch.Mailbox.exec( Mailbox.scala:231)     
   at scala.concurrent.forkjoin. ForkJoinTask.doExec( ForkJoinTask.java:260)    
    at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. 
pollAndExecAll(ForkJoinPool. java:1253)        at scala.concurrent.forkjoin. 
ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java: 1346)        at 
scala.concurrent.forkjoin. ForkJoinPool.runWorker( ForkJoinPool.java:1979)      
  at scala.concurrent.forkjoin. ForkJoinWorkerThread.run( 
ForkJoinWorkerThread.java:107)



   

Re: Why did the Flink Cluster JM crash?

2016-11-08 Thread amir bahmanyari
Clean .No errors...no exceptions :-(Thanks Till.

  From: Till Rohrmann <till.rohrm...@gmail.com>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
 Sent: Tuesday, November 8, 2016 2:11 PM
 Subject: Re: Why did the Flink Cluster JM crash?
   
Hi Amir,
what does the JM logs say?
Cheers,Till
On Tue, Nov 8, 2016 at 9:33 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Hi colleagues,I started the cluster all fine. Started the Beam app running in 
the Flink Cluster fine.Dashboard showed all tasks being consumed and open for 
business.I started sending data to the Beam app, and all of the sudden the 
Flink JM crashed.Exceptions below.Thanks+regardsAmir
java.lang.RuntimeException: Pipeline execution failed        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:113)        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:48)        at 
org.apache.beam.sdk.Pipeline. run(Pipeline.java:183)        at 
benchmark.flinkspark.flink. BenchBeamRunners.main( BenchBeamRunners.java:622)  
//p.run();        at sun.reflect. NativeMethodAccessorImpl. invoke0(Native 
Method)        at sun.reflect. NativeMethodAccessorImpl. invoke( 
NativeMethodAccessorImpl.java: 62)        at sun.reflect. 
DelegatingMethodAccessorImpl. invoke( DelegatingMethodAccessorImpl. java:43)    
    at java.lang.reflect.Method. invoke(Method.java:498)        at 
org.apache.flink.client. program.PackagedProgram. callMainMethod( 
PackagedProgram.java:505)        at org.apache.flink.client. 
program.PackagedProgram. invokeInteractiveModeForExecut 
ion(PackagedProgram.java:403)        at org.apache.flink.client. 
program.Client.runBlocking( Client.java:248)        at org.apache.flink.client. 
CliFrontend. executeProgramBlocking( CliFrontend.java:866)        at 
org.apache.flink.client. CliFrontend.run(CliFrontend. java:333)        at 
org.apache.flink.client. CliFrontend.parseParameters( CliFrontend.java:1189)    
    at org.apache.flink.client. CliFrontend.main(CliFrontend. java:1239)Caused 
by: org.apache.flink.client. program. ProgramInvocationException: The program 
execution failed: Communication with JobManager failed: Lost connection to the 
JobManager.        at org.apache.flink.client. program.Client.runBlocking( 
Client.java:381)        at org.apache.flink.client. program.Client.runBlocking( 
Client.java:355)        at org.apache.flink.streaming. api.environment. 
StreamContextEnvironment. execute( StreamContextEnvironment.java: 65)        at 
org.apache.beam.runners.flink. FlinkPipelineExecutionEnvironm 
ent.executePipeline( FlinkPipelineExecutionEnvironm ent.java:118)        at 
org.apache.beam.runners.flink. FlinkRunner.run(FlinkRunner. java:110)        
... 14 moreCaused by: org.apache.flink.runtime. client.JobExecutionException: 
Communication with JobManager failed: Lost connection to the JobManager.        
at org.apache.flink.runtime. client.JobClient. submitJobAndWait(JobClient. 
java:140)        at org.apache.flink.client. program.Client.runBlocking( 
Client.java:379)        ... 18 moreCaused by: org.apache.flink.runtime. client. 
JobClientActorConnectionTimeou tException: Lost connection to the JobManager.   
     at org.apache.flink.runtime. client.JobClientActor. 
handleMessage(JobClientActor. java:244)        at 
org.apache.flink.runtime.akka. FlinkUntypedActor. handleLeaderSessionID( 
FlinkUntypedActor.java:88)        at org.apache.flink.runtime.akka. 
FlinkUntypedActor.onReceive( FlinkUntypedActor.java:68)        at 
akka.actor.UntypedActor$$ anonfun$receive$1.applyOrElse( 
UntypedActor.scala:167)        at akka.actor.Actor$class. 
aroundReceive(Actor.scala:465)        at akka.actor.UntypedActor. 
aroundReceive(UntypedActor. scala:97)        at akka.actor.ActorCell. 
receiveMessage(ActorCell. scala:516)        at akka.actor.ActorCell.invoke( 
ActorCell.scala:487)        at akka.dispatch.Mailbox. 
processMailbox(Mailbox.scala: 254)        at akka.dispatch.Mailbox.run( 
Mailbox.scala:221)        at akka.dispatch.Mailbox.exec( Mailbox.scala:231)     
   at scala.concurrent.forkjoin. ForkJoinTask.doExec( ForkJoinTask.java:260)    
    at scala.concurrent.forkjoin. ForkJoinPool$WorkQueue. 
pollAndExecAll(ForkJoinPool. java:1253)        at scala.concurrent.forkjoin. 
ForkJoinPool$WorkQueue. runTask(ForkJoinPool.java: 1346)        at 
scala.concurrent.forkjoin. ForkJoinPool.runWorker( ForkJoinPool.java:1979)      
  at scala.concurrent.forkjoin. ForkJoinWorkerThread.run( 
ForkJoinWorkerThread.java:107)



   

Why did the Flink Cluster JM crash?

2016-11-08 Thread amir bahmanyari
Hi colleagues,I started the cluster all fine. Started the Beam app running in 
the Flink Cluster fine.Dashboard showed all tasks being consumed and open for 
business.I started sending data to the Beam app, and all of the sudden the 
Flink JM crashed.Exceptions below.Thanks+regardsAmir
java.lang.RuntimeException: Pipeline execution failed        at 
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:113)        at 
org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:48)        at 
org.apache.beam.sdk.Pipeline.run(Pipeline.java:183)        at 
benchmark.flinkspark.flink.BenchBeamRunners.main(BenchBeamRunners.java:622)  
//p.run();        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)  
      at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)        
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)   
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)Caused 
by: org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Communication with JobManager failed: Lost connection to the 
JobManager.        at 
org.apache.flink.client.program.Client.runBlocking(Client.java:381)        at 
org.apache.flink.client.program.Client.runBlocking(Client.java:355)        at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
        at 
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:118)
        at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:110)  
      ... 14 moreCaused by: 
org.apache.flink.runtime.client.JobExecutionException: Communication with 
JobManager failed: Lost connection to the JobManager.        at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:140)  
      at org.apache.flink.client.program.Client.runBlocking(Client.java:379)    
    ... 18 moreCaused by: 
org.apache.flink.runtime.client.JobClientActorConnectionTimeoutException: Lost 
connection to the JobManager.        at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:244)
        at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
        at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
        at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)  
      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)        at 
akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)        at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)        at 
akka.actor.ActorCell.invoke(ActorCell.scala:487)        at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)        at 
akka.dispatch.Mailbox.run(Mailbox.scala:221)        at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231)        at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)        
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: Flink Metrics

2016-10-17 Thread amir bahmanyari
Hi colleagues,Is there a link that described Flink Matrices & provides example 
on how to utilize it pls?I really appreciate it...Cheers

  From: Till Rohrmann 
 To: user@flink.apache.org 
Cc: d...@flink.apache.org
 Sent: Monday, October 17, 2016 12:52 AM
 Subject: Re: Flink Metrics
   
Hi Govind,

I think the DropwizardMeterWrapper implementation is just a reference
implementation where it was decided to report the minute rate. You can
define your own meter class which allows to configure the rate interval
accordingly.

Concerning Timers, I think nobody requested this metric so far. If you
want, then you can open a JIRA issue and contribute it. The community would
really appreciate that.

Cheers,
Till
​

On Mon, Oct 17, 2016 at 5:26 AM, Govindarajan Srinivasaraghavan <
govindragh...@gmail.com> wrote:

> Hi,
>
> I am currently using flink 1.2 snapshot and instrumenting my pipeline with
> flink metrics. One small suggestion I have is currently the Meter interface
> only supports getRate() which is always the one minute rate.
>
> It would great if all the rates (1 min, 5 min & 15 min) are exposed to get
> a better picture in terms of performance.
>
> Also is there any reason why timers are not part of flink metrics core?
>
> Regards,
> Govind
>

   

Re: How can I prove ....

2016-09-30 Thread amir bahmanyari
Hi Stephan,This is from the dashboard. Total Parallelism is set = 1024.259 
tasks per TM. all say Running, but I get *.out log in beam4 server only (bottom 
of the servers list).Does this mean that all nodes are engaged in processing 
the data?Why are these encircled columns having 0's for their data exchange 
report?Thanks+regards,Amir-




  From: Stephan Ewen <se...@apache.org>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
Cc: Felix Dreissig <f...@f30.me>
 Sent: Monday, September 26, 2016 2:18 AM
 Subject: Re: How can I prove 
   
You do not need to create any JSON.
Just click on "Running Jobs" in the UI, and then on the job. The parallelism is 
shown as a number in the boxes of the graph.
On Sat, Sep 24, 2016 at 6:28 PM, amir bahmanyari <amirto...@yahoo.com> wrote:

Thanks Felix.Interesting. I tried to create the JASON but didnt work  according 
to the sample code I found in docs.There is a way to get the same JASON from 
the command line.Is there an example?Thanks+regardsAmir-

  From: Felix Dreissig <f...@f30.me>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: user@flink.apache.org
 Sent: Saturday, September 24, 2016 8:18 AM
 Subject: Re: How can I prove ....
   
Hi Amir,

On 23 Sep 2016, at 19:57, amir bahmanyari <amirto...@yahoo.com> wrote:
> Currently running with 512 slots all taken as indicated by the dashboard.
> Are we talking about this? Then yes based on no available slots, I assume I 
> am at 512 .

I guess Stephan is referring to the parallelism of single operators as 
displayed in the operator graph, see e.g. https://ci.apache.org/ 
projects/flink/flink-docs- release-0.10/page/img/ webclient_plan_view.png .

Regards,
Felix

   



   

Re: How can I prove ....

2016-09-24 Thread amir bahmanyari
Thanks Felix.Interesting. I tried to create the JASON but didnt work  according 
to the sample code I found in docs.There is a way to get the same JASON from 
the command line.Is there an example?Thanks+regardsAmir-

  From: Felix Dreissig <f...@f30.me>
 To: amir bahmanyari <amirto...@yahoo.com> 
Cc: user@flink.apache.org
 Sent: Saturday, September 24, 2016 8:18 AM
 Subject: Re: How can I prove 
   
Hi Amir,

On 23 Sep 2016, at 19:57, amir bahmanyari <amirto...@yahoo.com> wrote:
> Currently running with 512 slots all taken as indicated by the dashboard.
> Are we talking about this? Then yes based on no available slots, I assume I 
> am at 512 .

I guess Stephan is referring to the parallelism of single operators as 
displayed in the operator graph, see e.g. 
https://ci.apache.org/projects/flink/flink-docs-release-0.10/page/img/webclient_plan_view.png
 .

Regards,
Felix

   

Re: How can I prove ....

2016-09-22 Thread amir bahmanyari
Hi Again, following is from the dashboard while wverything is supposedlt 
running.No real-time change in send/received/#of records...but one node is 
definitely producing a *.out file...And all TMs are reporting in their *.log 
files. And the process will eventually end , but very slow.Thanks again 
Aljoscha.



  From: amir bahmanyari <amirto...@yahoo.com>
 To: Aljoscha Krettek <aljos...@apache.org>; User <user@flink.apache.org> 
 Sent: Thursday, September 22, 2016 9:16 AM
 Subject: Re: How can I prove 
   
Thanks Aljoscha,Thats why I am wondering about this. I dont see send/receive 
columns change at alljust 0's all the time.The only thing that changes is 
time stamp.Is this an indication that the nodes in the cluster are not 
participating in execution of the data?Thanks again.Amir-

  From: Aljoscha Krettek <aljos...@apache.org>
 To: amir bahmanyari <amirto...@yahoo.com>; User <user@flink.apache.org> 
 Sent: Thursday, September 22, 2016 5:01 AM
 Subject: Re: How can I prove 
  
Hi,depending on the data source you might not be able to stress CPU/MEM because 
the source might be to slow. As long as you see the numbers increasing in the 
Flink Dashboard for all operators you should be good.
Cheers,Aljoscha
On Thu, 22 Sep 2016 at 00:26 amir bahmanyari <amirto...@yahoo.com> wrote:

That all nodes in a Flink Cluster are involved simultaneously in processing the 
data?Programmatically, graphically...I need to stress CPU , MEM and all 
resources to their max.How can I guarantee this is happening in Flink 
Cluster?Out of 4 nodes, this is the highest resource usage I see from 
"top"...Everything else is not even close...top - 22:22:45 up 41 days,  2:39,  
1 user,  load average: 1.76, 1.55, 1.28Tasks: 344 total,   1 running, 343 
sleeping,   0 stopped,   0 zombie%Cpu(s):  5.4 us,  1.0 sy,  0.0 ni, 93.5 id,  
0.1 wa,  0.0 hi,  0.0 si,  0.0 stKiB Mem:  11551564+total, 65702020 used, 
49813632 free,   115072 buffersKiB Swap:        0 total,        0 used,        
0 free.  3148420 cached Mem

I am pretty sure I can push FlinkRunner to way more extent than thisAnd 
thats where true realistic perf numbers start showing up.Thanks+regards,Amir-


   

   

Re: How can I prove ....

2016-09-22 Thread amir bahmanyari
Thanks Aljoscha,Thats why I am wondering about this. I dont see send/receive 
columns change at alljust 0's all the time.The only thing that changes is 
time stamp.Is this an indication that the nodes in the cluster are not 
participating in execution of the data?Thanks again.Amir-

  From: Aljoscha Krettek <aljos...@apache.org>
 To: amir bahmanyari <amirto...@yahoo.com>; User <user@flink.apache.org> 
 Sent: Thursday, September 22, 2016 5:01 AM
 Subject: Re: How can I prove 
   
Hi,depending on the data source you might not be able to stress CPU/MEM because 
the source might be to slow. As long as you see the numbers increasing in the 
Flink Dashboard for all operators you should be good.
Cheers,Aljoscha
On Thu, 22 Sep 2016 at 00:26 amir bahmanyari <amirto...@yahoo.com> wrote:

That all nodes in a Flink Cluster are involved simultaneously in processing the 
data?Programmatically, graphically...I need to stress CPU , MEM and all 
resources to their max.How can I guarantee this is happening in Flink 
Cluster?Out of 4 nodes, this is the highest resource usage I see from 
"top"...Everything else is not even close...top - 22:22:45 up 41 days,  2:39,  
1 user,  load average: 1.76, 1.55, 1.28Tasks: 344 total,   1 running, 343 
sleeping,   0 stopped,   0 zombie%Cpu(s):  5.4 us,  1.0 sy,  0.0 ni, 93.5 id,  
0.1 wa,  0.0 hi,  0.0 si,  0.0 stKiB Mem:  11551564+total, 65702020 used, 
49813632 free,   115072 buffersKiB Swap:        0 total,        0 used,        
0 free.  3148420 cached Mem

I am pretty sure I can push FlinkRunner to way more extent than thisAnd 
thats where true realistic perf numbers start showing up.Thanks+regards,Amir-


   

How can I prove ....

2016-09-21 Thread amir bahmanyari
That all nodes in a Flink Cluster are involved simultaneously in processing the 
data?Programmatically, graphically...I need to stress CPU , MEM and all 
resources to their max.How can I guarantee this is happening in Flink 
Cluster?Out of 4 nodes, this is the highest resource usage I see from 
"top"...Everything else is not even close...top - 22:22:45 up 41 days,  2:39,  
1 user,  load average: 1.76, 1.55, 1.28Tasks: 344 total,   1 running, 343 
sleeping,   0 stopped,   0 zombie%Cpu(s):  5.4 us,  1.0 sy,  0.0 ni, 93.5 id,  
0.1 wa,  0.0 hi,  0.0 si,  0.0 stKiB Mem:  11551564+total, 65702020 used, 
49813632 free,   115072 buffersKiB Swap:        0 total,        0 used,        
0 free.  3148420 cached Mem

I am pretty sure I can push FlinkRunner to way more extent than thisAnd 
thats where true realistic perf numbers start showing up.Thanks+regards,Amir-

Re: Flink Cluster Load Distribution Question

2016-09-19 Thread Amir Bahmanyari
Thanx
Could you elaborate on writing to all partitions and not just one pls?
How can I make sure ?
I see all partitions consumed in the dashboard and they get listed when my Beam 
app starts and KafkaIO read operation gets associated to every single partition 
What else ?
Thanks so much again

Sent from my iPhone

> On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> good to see that you're making progress! The number of partitions in the 
> Kafka topic should be >= the number of parallel Flink Slots and the 
> parallelism with which you start the program. You also have to make sure to 
> write to all partitions and not just to one.
> 
> Cheers,
> Aljoscha
> 
>> On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <amirto...@yahoo.com> wrote:
>> Hi Aljoscha,
>> Thanks for your kind response.
>> - We are really benchmarking Beam & its Runners and it happened that we 
>> started with Flink.
>> therefore, any change we make to the approach must be a Beam code change 
>> that automatically affects the underlying runner.
>> - I changed the TextIO() back to KafkaIO() reading from a Kafka cluster 
>> instead of a single node. Its behaving fine except that I am getting out of 
>> disk space by Kafka broker
>> & am working around it as we speak.
>> - I removed Redis as per your recommendation & replaced it with Java 
>> Concurrenthashmaps...Started to be a lot faster than before for sure.
>> I cannot use a FLink specific solution for this. Must be either an external 
>> something or a Beam solution or just JVM solution. I picked 
>> Concurrenthashmaps for now.
>> If I get by the Kafka  broker disk space issue, and dont get an out of 
>> memory by the flink servers in 3 hrs of runtime, I should be starting seeing 
>> the light :)) 
>> Pls keep your fingers crossed as testing is underway for 10 express ways of 
>> linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
>> - Kafka partitions in the kafka topic = total number of slots available in 
>> flink servers. Should I alter that for better performance?
>> 
>> Thanks Aljoscha & have a great weekend.
>> Amir-
>> 
>> From: Aljoscha Krettek <aljos...@apache.org>
>> To: Amir Bahmanyari <amirto...@yahoo.com>; user <user@flink.apache.org> 
>> Sent: Sunday, September 18, 2016 1:48 AM
>> Subject: Re: Flink Cluster Load Distribution Question
>> 
>> This is not related to Flink, but in Beam you can read from a directory 
>> containing many files using something like this (from MinimalWordCount.java 
>> in Beam):
>> 
>> TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")
>> 
>> This will read all the files in the directory in parallel.
>> 
>> For reading from Kafka I wrote this on another thread of yours:
>> Are you sure that all your Kafka partitions contain data. Did you have a 
>> look at the Kafka metrics to see how the individual partitions are filled? 
>> If only one partition contains data, then you will only read data in one 
>> parallel instance of the sources. How are you writing your data to Kafka?
>> 
>> Flink/Beam should read from all partitions if all of them contain data. 
>> Could you please verify that all Kafka partitions contain data by looking at 
>> the metrics of your Kafka cluster, that would be a first step towards 
>> finding out where the problem lies.
>> 
>> By the way, your code uses Beam in a highly non-idiomatic way. Interacting 
>> with an outside database, such as Redis, will always be the bottleneck in 
>> such a job. Flink provides an abstraction for dealing with state that is 
>> vastly superior to using an external system. We recently did a blog post 
>> about rewriting a similar streaming use case using Flink's internal state: 
>> http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe 
>> that's interesting for you.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <amirto...@yahoo.com> wrote:
>> Thanks so much Aljoscha 
>> Is there an example that shows how to read from multiple files accurately or 
>> from KafkaIO and get perfect parallelism pls?
>> Have a great weekend
>> 
>> Sent from my iPhone
>> 
>>> On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>>> 
>>> One observation here is that you're only reading from one file. This will 
>>> mean that you won't get any parallelism. Everything is executed on just one 
>>> task/thread.
>>> 
>&g

Re: Fw: Flink Cluster Load Distribution Question

2016-09-14 Thread amir bahmanyari
Hi Aljoscha,The JM logs is also attached. Seems like everything is ok, 
assigned...to all nodes...Not sure why I dont get performance? 
:-(Thanks+regards,Amir-

  From: Aljoscha Krettek <aljos...@apache.org>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
 Sent: Wednesday, September 14, 2016 1:48 AM
 Subject: Re: Fw: Flink Cluster Load Distribution Question
   
Hi,this is a different job from the Kafka Job that you have running, right?
Could you maybe post the code for that as well?
Cheers,Aljoscha
On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote:

Hi Robert,Sure, I am forwarding it to user. Sorry about that. I followed the 
"robot's" instructions :))Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 
GB). Lets call them node1, 2, 3, 4.Flink Clustered with node1 running JM & a 
TM. Three more TM's running on node2,3, and 4 respectively.I have a Beam 
running FLink Runner underneath.The input data is received by Beam TextIO() 
reading off a 1.6 GB of data containing roughly 22 million tuples.All nodes 
have identical flink-conf.yaml, masters & slaves contents as follows:
flink-conf.yaml:
jobmanager.rpc.address: node1  jobmanager.rpc.port: 6123 
jobmanager.heap.mb: 1024 taskmanager.heap.mb: 102400 
taskmanager.numberOfTaskSlots: 16  taskmanager.memory.preallocate: false 
parallelism.default: 64 jobmanager.web.port: 8081 
taskmanager.network.numberOfBuffers: 4096


masters: node1:8081
slaves:node1node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all 
nodes.JM, TMs log files get generated on all nodes.Dashboard shows how all 
slots are being used.I deploy the Beam app to the cluster where JM is running 
at node1.a *.out file gets generated as data is being processed. No *.out on 
other nodes, just node1 where I deployed the fat jar.I tail -f the *.out log on 
node1 (master). starts fine...but slowly degrades & becomes extremely slow.As 
we speak, I started the Beam app 13 hrs ago and its still running.How can I 
prove that ALL NODES are involved in processing the data at the same time i.e. 
clustered?Do the above configurations look ok for a reasonable 
performance?Given above parameters set, how can I improve the performance in 
this cluster?What other information and or dashboard screen shots is needed to 
clarify this issue. I used these websites to do the configuration:Apache Flink: 
Cluster Setup

  
|  
|   |  
Apache Flink: Cluster Setup
   |  |

  |

 

Apache Flink: Configuration


  
|  
|   |  
Apache Flink: Configuration
   |  |

  |

 
In the second link, there is a config recommendation for the following but this 
parameter is not in the configuration file out of the box:   
   - taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value 
i.e.32 KB doesn't get picked up?Sorry too many questions.Pls let me know.I 
appreciate your help.Cheers,Amir-
- Forwarded Message -
 From: Robert Metzger <rmetz...@apache.org>
 To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari 
<amirto...@yahoo.com> 
 Sent: Tuesday, September 13, 2016 1:15 AM
 Subject: Re: Flink Cluster Load Distribution Question
  
Hi Amir,

I would recommend to post such questions to the user@flink mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
amirto...@yahoo.com.invalid> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its so slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-


 


   

flink-abahman-jobmanager-1-beam1.log
Description: Binary data


Re: Fw: Flink Cluster Load Distribution Question

2016-09-14 Thread amir bahmanyari
Hi Aljoscha,Thanks for your response. Its the same job but I am reading through 
TextIO() instead of a Kafka topic.I thought that would make a difference. It 
doesnt. Same slowness in Flink Cluster.I had sent you the code with reading 
from KafkaIO().Nothing different except commenting out the KafkaIO() & 
un-commenting TextIO().Its attached along with the Support class.Is there 
anything interesting you see in my configuration that may cause slowness and/or 
lack of the right distribution in the cluster as a whole?I also attached my 
config files in the JM node...same for other nodes.Have a wonderful day & 
thanks for your attention.Amir-


  From: Aljoscha Krettek <aljos...@apache.org>
 To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
 Sent: Wednesday, September 14, 2016 1:48 AM
 Subject: Re: Fw: Flink Cluster Load Distribution Question
   
Hi,this is a different job from the Kafka Job that you have running, right?
Could you maybe post the code for that as well?
Cheers,Aljoscha
On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote:

Hi Robert,Sure, I am forwarding it to user. Sorry about that. I followed the 
"robot's" instructions :))Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 
GB). Lets call them node1, 2, 3, 4.Flink Clustered with node1 running JM & a 
TM. Three more TM's running on node2,3, and 4 respectively.I have a Beam 
running FLink Runner underneath.The input data is received by Beam TextIO() 
reading off a 1.6 GB of data containing roughly 22 million tuples.All nodes 
have identical flink-conf.yaml, masters & slaves contents as follows:
flink-conf.yaml:
jobmanager.rpc.address: node1  jobmanager.rpc.port: 6123 
jobmanager.heap.mb: 1024 taskmanager.heap.mb: 102400 
taskmanager.numberOfTaskSlots: 16  taskmanager.memory.preallocate: false 
parallelism.default: 64 jobmanager.web.port: 8081 
taskmanager.network.numberOfBuffers: 4096


masters: node1:8081
slaves:node1node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all 
nodes.JM, TMs log files get generated on all nodes.Dashboard shows how all 
slots are being used.I deploy the Beam app to the cluster where JM is running 
at node1.a *.out file gets generated as data is being processed. No *.out on 
other nodes, just node1 where I deployed the fat jar.I tail -f the *.out log on 
node1 (master). starts fine...but slowly degrades & becomes extremely slow.As 
we speak, I started the Beam app 13 hrs ago and its still running.How can I 
prove that ALL NODES are involved in processing the data at the same time i.e. 
clustered?Do the above configurations look ok for a reasonable 
performance?Given above parameters set, how can I improve the performance in 
this cluster?What other information and or dashboard screen shots is needed to 
clarify this issue. I used these websites to do the configuration:Apache Flink: 
Cluster Setup

  
|  
|   |  
Apache Flink: Cluster Setup
   |  |

  |

 

Apache Flink: Configuration


  
|  
|   |  
Apache Flink: Configuration
   |  |

  |

 
In the second link, there is a config recommendation for the following but this 
parameter is not in the configuration file out of the box:   
   - taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value 
i.e.32 KB doesn't get picked up?Sorry too many questions.Pls let me know.I 
appreciate your help.Cheers,Amir-
- Forwarded Message -
 From: Robert Metzger <rmetz...@apache.org>
 To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari 
<amirto...@yahoo.com> 
 Sent: Tuesday, September 13, 2016 1:15 AM
 Subject: Re: Flink Cluster Load Distribution Question
  
Hi Amir,

I would recommend to post such questions to the user@flink mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
amirto...@yahoo.com.invalid> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its so slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the 

Fw: Flink Cluster Load Distribution Question

2016-09-13 Thread amir bahmanyari
Hi Robert,Sure, I am forwarding it to user. Sorry about that. I followed the 
"robot's" instructions :))Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 
GB). Lets call them node1, 2, 3, 4.Flink Clustered with node1 running JM & a 
TM. Three more TM's running on node2,3, and 4 respectively.I have a Beam 
running FLink Runner underneath.The input data is received by Beam TextIO() 
reading off a 1.6 GB of data containing roughly 22 million tuples.All nodes 
have identical flink-conf.yaml, masters & slaves contents as follows:
flink-conf.yaml:
jobmanager.rpc.address: node1  jobmanager.rpc.port: 6123 
jobmanager.heap.mb: 1024 taskmanager.heap.mb: 102400 
taskmanager.numberOfTaskSlots: 16  taskmanager.memory.preallocate: false 
parallelism.default: 64 jobmanager.web.port: 8081 
taskmanager.network.numberOfBuffers: 4096


masters: node1:8081
slaves:node1node2
node3
node4

Everything looks normal at ./start-cluster.sh & all daemons start on all 
nodes.JM, TMs log files get generated on all nodes.Dashboard shows how all 
slots are being used.I deploy the Beam app to the cluster where JM is running 
at node1.a *.out file gets generated as data is being processed. No *.out on 
other nodes, just node1 where I deployed the fat jar.I tail -f the *.out log on 
node1 (master). starts fine...but slowly degrades & becomes extremely slow.As 
we speak, I started the Beam app 13 hrs ago and its still running.How can I 
prove that ALL NODES are involved in processing the data at the same time i.e. 
clustered?Do the above configurations look ok for a reasonable 
performance?Given above parameters set, how can I improve the performance in 
this cluster?What other information and or dashboard screen shots is needed to 
clarify this issue. I used these websites to do the configuration:Apache Flink: 
Cluster Setup

  
|  
|   |  
Apache Flink: Cluster Setup
   |  |

  |

 

Apache Flink: Configuration


  
|  
|   |  
Apache Flink: Configuration
   |  |

  |

 
In the second link, there is a config recommendation for the following but this 
parameter is not in the configuration file out of the box:   
   - taskmanager.network.bufferSizeInBytes
Should I include it manually? Does it make any difference if the default value 
i.e.32 KB doesn't get picked up?Sorry too many questions.Pls let me know.I 
appreciate your help.Cheers,Amir-
- Forwarded Message -
 From: Robert Metzger <rmetz...@apache.org>
 To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari 
<amirto...@yahoo.com> 
 Sent: Tuesday, September 13, 2016 1:15 AM
 Subject: Re: Flink Cluster Load Distribution Question
   
Hi Amir,

I would recommend to post such questions to the user@flink mailing list in
the future. This list is meant for development-related topics.

I think we need more details to understand why your application is not
running properly. Can you quickly describe what your topology is doing?
Are you setting the parallelism to a value >= 1 ?

Regards,
Robert


On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
amirto...@yahoo.com.invalid> wrote:

> Hi Colleagues,Just joined this forum.I have done everything possible to
> get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
> generates system-output logs (*.out) in only one node. Its so slow
> for 4 nodes being there.Seems like the load is not distributed amongst all
> 4 nodes but only one node. Most of the time the one where JM runs.I
> run/tested it in a single node, and it took even faster to run the same
> load.Not sure whats not being configured right.1- why am I getting
> SystemOut .out log in only one server? All nodes get their TaskManager log
> files updated thu.2- why dont I see load being distributed amongst all 4
> nodes, but only one all the times.3- Why does the Dashboard show a 0 (zero)
> for Send/Receive numbers per all Task Managers.
> The Dashboard shows all the right stuff. Top shows not much of resources
> being stressed on any of the nodes.I can share its contents if it helps
> diagnosing the issue.Thanks + I appreciate your valuable time, response &
> help.Amir-