Thanx Could you elaborate on writing to all partitions and not just one pls? How can I make sure ? I see all partitions consumed in the dashboard and they get listed when my Beam app starts and KafkaIO read operation gets associated to every single partition What else ? Thanks so much again
Sent from my iPhone > On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > good to see that you're making progress! The number of partitions in the > Kafka topic should be >= the number of parallel Flink Slots and the > parallelism with which you start the program. You also have to make sure to > write to all partitions and not just to one. > > Cheers, > Aljoscha > >> On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <amirto...@yahoo.com> wrote: >> Hi Aljoscha, >> Thanks for your kind response. >> - We are really benchmarking Beam & its Runners and it happened that we >> started with Flink. >> therefore, any change we make to the approach must be a Beam code change >> that automatically affects the underlying runner. >> - I changed the TextIO() back to KafkaIO() reading from a Kafka cluster >> instead of a single node. Its behaving fine except that I am getting out of >> disk space by Kafka broker >> & am working around it as we speak. >> - I removed Redis as per your recommendation & replaced it with Java >> Concurrenthashmaps...Started to be a lot faster than before for sure. >> I cannot use a FLink specific solution for this. Must be either an external >> something or a Beam solution or just JVM solution. I picked >> Concurrenthashmaps for now. >> If I get by the Kafka broker disk space issue, and dont get an out of >> memory by the flink servers in 3 hrs of runtime, I should be starting seeing >> the light :)) >> Pls keep your fingers crossed as testing is underway for 10 express ways of >> linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs. >> - Kafka partitions in the kafka topic = total number of slots available in >> flink servers. Should I alter that for better performance? >> >> Thanks Aljoscha & have a great weekend. >> Amir- >> >> From: Aljoscha Krettek <aljos...@apache.org> >> To: Amir Bahmanyari <amirto...@yahoo.com>; user <user@flink.apache.org> >> Sent: Sunday, September 18, 2016 1:48 AM >> Subject: Re: Flink Cluster Load Distribution Question >> >> This is not related to Flink, but in Beam you can read from a directory >> containing many files using something like this (from MinimalWordCount.java >> in Beam): >> >> TextIO.Read.from("gs://apache-beam-samples/shakespeare/*") >> >> This will read all the files in the directory in parallel. >> >> For reading from Kafka I wrote this on another thread of yours: >> Are you sure that all your Kafka partitions contain data. Did you have a >> look at the Kafka metrics to see how the individual partitions are filled? >> If only one partition contains data, then you will only read data in one >> parallel instance of the sources. How are you writing your data to Kafka? >> >> Flink/Beam should read from all partitions if all of them contain data. >> Could you please verify that all Kafka partitions contain data by looking at >> the metrics of your Kafka cluster, that would be a first step towards >> finding out where the problem lies. >> >> By the way, your code uses Beam in a highly non-idiomatic way. Interacting >> with an outside database, such as Redis, will always be the bottleneck in >> such a job. Flink provides an abstraction for dealing with state that is >> vastly superior to using an external system. We recently did a blog post >> about rewriting a similar streaming use case using Flink's internal state: >> http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe >> that's interesting for you. >> >> Cheers, >> Aljoscha >> >> On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <amirto...@yahoo.com> wrote: >> Thanks so much Aljoscha >> Is there an example that shows how to read from multiple files accurately or >> from KafkaIO and get perfect parallelism pls? >> Have a great weekend >> >> Sent from my iPhone >> >>> On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> One observation here is that you're only reading from one file. This will >>> mean that you won't get any parallelism. Everything is executed on just one >>> task/thread. >>> >>> Cheers, >>> Aljoscha >>> >>> On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <amirto...@yahoo.com> wrote: >>> Hi Aljoscha, >>> Experimenting on relatively smaller file , everything fixed except >>> KafkaIO() vs. TextIO(), I get 50% better runtime performance in the Flink >>> Cluster when reading tuples by TextIO(). >>> I understand the NW involvement in reading from Kafka topic etc., but 50% >>> is significant. >>> Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same >>> performance & increasing the topic partitions doesnt improve anything. >>> I thought some of the 64 slots may get multiple-over- parallelism really >>> pushing it to its limit. 64 kafka topic partitions & 400 kafka topic >>> partitions while #slots=64 is the same. >>> >>> Its still slow for a relatively large file though. >>> Pls advice if something I can try to improve the cluster performance. >>> Thanks+regards >>> >>> From: Aljoscha Krettek <aljos...@apache.org> >>> To: user@flink.apache.org; amir bahmanyari <amirto...@yahoo.com> >>> Sent: Wednesday, September 14, 2016 1:48 AM >>> Subject: Re: Fw: Flink Cluster Load Distribution Question >>> >>> Hi, >>> this is a different job from the Kafka Job that you have running, right? >>> >>> Could you maybe post the code for that as well? >>> >>> Cheers, >>> Aljoscha >>> >>> On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote: >>> Hi Robert, >>> Sure, I am forwarding it to user. Sorry about that. I followed the >>> "robot's" instructions :)) >>> Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them >>> node1, 2, 3, 4. >>> Flink Clustered with node1 running JM & a TM. Three more TM's running on >>> node2,3, and 4 respectively. >>> I have a Beam running FLink Runner underneath. >>> The input data is received by Beam TextIO() reading off a 1.6 GB of data >>> containing roughly 22 million tuples. >>> All nodes have identical flink-conf.yaml, masters & slaves contents as >>> follows: >>> >>> flink-conf.yaml: >>> jobmanager.rpc.address: node1 >>> jobmanager.rpc.port: 6123 >>> jobmanager.heap.mb: 1024 >>> taskmanager.heap.mb: 102400 >>> taskmanager.numberOfTaskSlots: 16 >>> taskmanager.memory.preallocate: false >>> parallelism.default: 64 >>> jobmanager.web.port: 8081 >>> taskmanager.network.numberOfBuffers: 4096 >>> >>> >>> >>> masters: >>> node1:8081 >>> >>> slaves: >>> node1 >>> node2 >>> node3 >>> node4 >>> >>> Everything looks normal at ./start-cluster.sh & all daemons start on all >>> nodes. >>> JM, TMs log files get generated on all nodes. >>> Dashboard shows how all slots are being used. >>> I deploy the Beam app to the cluster where JM is running at node1. >>> a *.out file gets generated as data is being processed. No *.out on other >>> nodes, just node1 where I deployed the fat jar. >>> I tail -f the *.out log on node1 (master). starts fine...but slowly >>> degrades & becomes extremely slow. >>> As we speak, I started the Beam app 13 hrs ago and its still running. >>> How can I prove that ALL NODES are involved in processing the data at the >>> same time i.e. clustered? >>> Do the above configurations look ok for a reasonable performance? >>> Given above parameters set, how can I improve the performance in this >>> cluster? >>> What other information and or dashboard screen shots is needed to clarify >>> this issue. >>> I used these websites to do the configuration: >>> Apache Flink: Cluster Setup >>> >>> Apache Flink: Cluster Setup >>> >>> >>> Apache Flink: Configuration >>> >>> >>> Apache Flink: Configuration >>> >>> In the second link, there is a config recommendation for the following but >>> this parameter is not in the configuration file out of the box: >>> taskmanager.network.bufferSizeInBytes >>> Should I include it manually? Does it make any difference if the default >>> value i.e.32 KB doesn't get picked up? >>> Sorry too many questions. >>> Pls let me know. >>> I appreciate your help. >>> Cheers, >>> Amir- >>> >>> ----- Forwarded Message ----- >>> From: Robert Metzger <rmetz...@apache.org> >>> To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari >>> <amirto...@yahoo.com> >>> Sent: Tuesday, September 13, 2016 1:15 AM >>> Subject: Re: Flink Cluster Load Distribution Question >>> >>> Hi Amir, >>> >>> I would recommend to post such questions to the user@flink mailing list in >>> the future. This list is meant for development-related topics. >>> >>> I think we need more details to understand why your application is not >>> running properly. Can you quickly describe what your topology is doing? >>> Are you setting the parallelism to a value >= 1 ? >>> >>> Regards, >>> Robert >>> >>> >>> On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari < >>> amirto...@yahoo.com.invalid> wrote: >>> >>> > Hi Colleagues,Just joined this forum.I have done everything possible to >>> > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always >>> > generates system-output logs (*.out) in only one node. Its sooooooooo slow >>> > for 4 nodes being there.Seems like the load is not distributed amongst all >>> > 4 nodes but only one node. Most of the time the one where JM runs.I >>> > run/tested it in a single node, and it took even faster to run the same >>> > load.Not sure whats not being configured right.1- why am I getting >>> > SystemOut .out log in only one server? All nodes get their TaskManager log >>> > files updated thu.2- why dont I see load being distributed amongst all 4 >>> > nodes, but only one all the times.3- Why does the Dashboard show a 0 >>> > (zero) >>> > for Send/Receive numbers per all Task Managers. >>> > The Dashboard shows all the right stuff. Top shows not much of resources >>> > being stressed on any of the nodes.I can share its contents if it helps >>> > diagnosing the issue.Thanks + I appreciate your valuable time, response & >>> > help.Amir-