Thanx
Could you elaborate on writing to all partitions and not just one pls?
How can I make sure ?
I see all partitions consumed in the dashboard and they get listed when my Beam 
app starts and KafkaIO read operation gets associated to every single partition 
What else ?
Thanks so much again

Sent from my iPhone

> On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> 
> Hi,
> good to see that you're making progress! The number of partitions in the 
> Kafka topic should be >= the number of parallel Flink Slots and the 
> parallelism with which you start the program. You also have to make sure to 
> write to all partitions and not just to one.
> 
> Cheers,
> Aljoscha
> 
>> On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <amirto...@yahoo.com> wrote:
>> Hi Aljoscha,
>> Thanks for your kind response.
>> - We are really benchmarking Beam & its Runners and it happened that we 
>> started with Flink.
>> therefore, any change we make to the approach must be a Beam code change 
>> that automatically affects the underlying runner.
>> - I changed the TextIO() back to KafkaIO() reading from a Kafka cluster 
>> instead of a single node. Its behaving fine except that I am getting out of 
>> disk space by Kafka broker
>> & am working around it as we speak.
>> - I removed Redis as per your recommendation & replaced it with Java 
>> Concurrenthashmaps...Started to be a lot faster than before for sure.
>> I cannot use a FLink specific solution for this. Must be either an external 
>> something or a Beam solution or just JVM solution. I picked 
>> Concurrenthashmaps for now.
>> If I get by the Kafka  broker disk space issue, and dont get an out of 
>> memory by the flink servers in 3 hrs of runtime, I should be starting seeing 
>> the light :)) 
>> Pls keep your fingers crossed as testing is underway for 10 express ways of 
>> linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
>> - Kafka partitions in the kafka topic = total number of slots available in 
>> flink servers. Should I alter that for better performance?
>> 
>> Thanks Aljoscha & have a great weekend.
>> Amir-
>> 
>> From: Aljoscha Krettek <aljos...@apache.org>
>> To: Amir Bahmanyari <amirto...@yahoo.com>; user <user@flink.apache.org> 
>> Sent: Sunday, September 18, 2016 1:48 AM
>> Subject: Re: Flink Cluster Load Distribution Question
>> 
>> This is not related to Flink, but in Beam you can read from a directory 
>> containing many files using something like this (from MinimalWordCount.java 
>> in Beam):
>> 
>> TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")
>> 
>> This will read all the files in the directory in parallel.
>> 
>> For reading from Kafka I wrote this on another thread of yours:
>> Are you sure that all your Kafka partitions contain data. Did you have a 
>> look at the Kafka metrics to see how the individual partitions are filled? 
>> If only one partition contains data, then you will only read data in one 
>> parallel instance of the sources. How are you writing your data to Kafka?
>> 
>> Flink/Beam should read from all partitions if all of them contain data. 
>> Could you please verify that all Kafka partitions contain data by looking at 
>> the metrics of your Kafka cluster, that would be a first step towards 
>> finding out where the problem lies.
>> 
>> By the way, your code uses Beam in a highly non-idiomatic way. Interacting 
>> with an outside database, such as Redis, will always be the bottleneck in 
>> such a job. Flink provides an abstraction for dealing with state that is 
>> vastly superior to using an external system. We recently did a blog post 
>> about rewriting a similar streaming use case using Flink's internal state: 
>> http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe 
>> that's interesting for you.
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <amirto...@yahoo.com> wrote:
>> Thanks so much Aljoscha 
>> Is there an example that shows how to read from multiple files accurately or 
>> from KafkaIO and get perfect parallelism pls?
>> Have a great weekend
>> 
>> Sent from my iPhone
>> 
>>> On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>>> 
>>> One observation here is that you're only reading from one file. This will 
>>> mean that you won't get any parallelism. Everything is executed on just one 
>>> task/thread.
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <amirto...@yahoo.com> wrote:
>>> Hi Aljoscha,
>>> Experimenting on  relatively smaller file , everything fixed except 
>>> KafkaIO()  vs. TextIO(), I get 50% better runtime performance in the Flink 
>>> Cluster when reading tuples by TextIO().
>>> I understand the NW involvement in reading from Kafka topic etc.,  but 50% 
>>> is significant.
>>> Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same 
>>> performance & increasing the topic partitions doesnt improve anything.
>>> I thought some of the 64 slots may get multiple-over- parallelism really 
>>> pushing it to its limit. 64 kafka topic partitions & 400 kafka topic 
>>> partitions while #slots=64  is the same.
>>> 
>>> Its still slow for a relatively large file though.
>>> Pls advice if something I can try to improve the cluster performance.
>>> Thanks+regards
>>> 
>>> From: Aljoscha Krettek <aljos...@apache.org>
>>> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> 
>>> Sent: Wednesday, September 14, 2016 1:48 AM
>>> Subject: Re: Fw: Flink Cluster Load Distribution Question
>>> 
>>> Hi,
>>> this is a different job from the Kafka Job that you have running, right?
>>> 
>>> Could you maybe post the code for that as well?
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>> On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote:
>>> Hi Robert,
>>> Sure, I am forwarding it to user. Sorry about that. I followed the 
>>> "robot's" instructions :))
>>> Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them 
>>> node1, 2, 3, 4.
>>> Flink Clustered with node1 running JM & a TM. Three more TM's running on 
>>> node2,3, and 4 respectively.
>>> I have a Beam running FLink Runner underneath.
>>> The input data is received by Beam TextIO() reading off a 1.6 GB of data 
>>> containing roughly 22 million tuples.
>>> All nodes have identical flink-conf.yaml, masters & slaves contents as 
>>> follows:
>>> 
>>> flink-conf.yaml:
>>>         jobmanager.rpc.address: node1       
>>>     jobmanager.rpc.port: 6123
>>>     jobmanager.heap.mb: 1024
>>>     taskmanager.heap.mb: 102400
>>>     taskmanager.numberOfTaskSlots: 16       
>>>     taskmanager.memory.preallocate: false
>>>     parallelism.default: 64
>>>     jobmanager.web.port: 8081
>>>     taskmanager.network.numberOfBuffers: 4096
>>> 
>>> 
>>> 
>>> masters: 
>>> node1:8081
>>> 
>>> slaves:
>>> node1
>>> node2
>>> node3
>>> node4
>>> 
>>> Everything looks normal at ./start-cluster.sh & all daemons start on all 
>>> nodes.
>>> JM, TMs log files get generated on all nodes.
>>> Dashboard shows how all slots are being used.
>>> I deploy the Beam app to the cluster where JM is running at node1.
>>> a *.out file gets generated as data is being processed. No *.out on other 
>>> nodes, just node1 where I deployed the fat jar.
>>> I tail -f the *.out log on node1 (master). starts fine...but slowly 
>>> degrades & becomes extremely slow.
>>> As we speak, I started the Beam app 13 hrs ago and its still running.
>>> How can I prove that ALL NODES are involved in processing the data at the 
>>> same time i.e. clustered?
>>> Do the above configurations look ok for a reasonable performance?
>>> Given above parameters set, how can I improve the performance in this 
>>> cluster?
>>> What other information and or dashboard screen shots is needed to clarify 
>>> this issue. 
>>> I used these websites to do the configuration:
>>> Apache Flink: Cluster Setup
>>> 
>>> Apache Flink: Cluster Setup
>>> 
>>> 
>>> Apache Flink: Configuration
>>> 
>>> 
>>> Apache Flink: Configuration
>>> 
>>> In the second link, there is a config recommendation for the following but 
>>> this parameter is not in the configuration file out of the box:
>>> taskmanager.network.bufferSizeInBytes
>>> Should I include it manually? Does it make any difference if the default 
>>> value i.e.32 KB doesn't get picked up?
>>> Sorry too many questions.
>>> Pls let me know.
>>> I appreciate your help.
>>> Cheers,
>>> Amir-
>>> 
>>> ----- Forwarded Message -----
>>> From: Robert Metzger <rmetz...@apache.org>
>>> To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari 
>>> <amirto...@yahoo.com> 
>>> Sent: Tuesday, September 13, 2016 1:15 AM
>>> Subject: Re: Flink Cluster Load Distribution Question
>>> 
>>> Hi Amir,
>>> 
>>> I would recommend to post such questions to the user@flink mailing list in
>>> the future. This list is meant for development-related topics.
>>> 
>>> I think we need more details to understand why your application is not
>>> running properly. Can you quickly describe what your topology is doing?
>>> Are you setting the parallelism to a value >= 1 ?
>>> 
>>> Regards,
>>> Robert
>>> 
>>> 
>>> On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
>>> amirto...@yahoo.com.invalid> wrote:
>>> 
>>> > Hi Colleagues,Just joined this forum.I have done everything possible to
>>> > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
>>> > generates system-output logs (*.out) in only one node. Its sooooooooo slow
>>> > for 4 nodes being there.Seems like the load is not distributed amongst all
>>> > 4 nodes but only one node. Most of the time the one where JM runs.I
>>> > run/tested it in a single node, and it took even faster to run the same
>>> > load.Not sure whats not being configured right.1- why am I getting
>>> > SystemOut .out log in only one server? All nodes get their TaskManager log
>>> > files updated thu.2- why dont I see load being distributed amongst all 4
>>> > nodes, but only one all the times.3- Why does the Dashboard show a 0 
>>> > (zero)
>>> > for Send/Receive numbers per all Task Managers.
>>> > The Dashboard shows all the right stuff. Top shows not much of resources
>>> > being stressed on any of the nodes.I can share its contents if it helps
>>> > diagnosing the issue.Thanks + I appreciate your valuable time, response &
>>> > help.Amir-

Reply via email to