Could you elaborate on writing to all partitions and not just one pls?
How can I make sure ?
I see all partitions consumed in the dashboard and they get listed when my Beam
app starts and KafkaIO read operation gets associated to every single partition
What else ?
Thanks so much again
Sent from my iPhone
> On Sep 18, 2016, at 10:30 PM, Aljoscha Krettek <aljos...@apache.org> wrote:
> good to see that you're making progress! The number of partitions in the
> Kafka topic should be >= the number of parallel Flink Slots and the
> parallelism with which you start the program. You also have to make sure to
> write to all partitions and not just to one.
>> On Sun, 18 Sep 2016 at 21:50 amir bahmanyari <amirto...@yahoo.com> wrote:
>> Hi Aljoscha,
>> Thanks for your kind response.
>> - We are really benchmarking Beam & its Runners and it happened that we
>> started with Flink.
>> therefore, any change we make to the approach must be a Beam code change
>> that automatically affects the underlying runner.
>> - I changed the TextIO() back to KafkaIO() reading from a Kafka cluster
>> instead of a single node. Its behaving fine except that I am getting out of
>> disk space by Kafka broker
>> & am working around it as we speak.
>> - I removed Redis as per your recommendation & replaced it with Java
>> Concurrenthashmaps...Started to be a lot faster than before for sure.
>> I cannot use a FLink specific solution for this. Must be either an external
>> something or a Beam solution or just JVM solution. I picked
>> Concurrenthashmaps for now.
>> If I get by the Kafka broker disk space issue, and dont get an out of
>> memory by the flink servers in 3 hrs of runtime, I should be starting seeing
>> the light :))
>> Pls keep your fingers crossed as testing is underway for 10 express ways of
>> linear road and thats 9 GB of tuples expected to be processed in 3.5 hrs.
>> - Kafka partitions in the kafka topic = total number of slots available in
>> flink servers. Should I alter that for better performance?
>> Thanks Aljoscha & have a great weekend.
>> From: Aljoscha Krettek <aljos...@apache.org>
>> To: Amir Bahmanyari <amirto...@yahoo.com>; user <email@example.com>
>> Sent: Sunday, September 18, 2016 1:48 AM
>> Subject: Re: Flink Cluster Load Distribution Question
>> This is not related to Flink, but in Beam you can read from a directory
>> containing many files using something like this (from MinimalWordCount.java
>> in Beam):
>> This will read all the files in the directory in parallel.
>> For reading from Kafka I wrote this on another thread of yours:
>> Are you sure that all your Kafka partitions contain data. Did you have a
>> look at the Kafka metrics to see how the individual partitions are filled?
>> If only one partition contains data, then you will only read data in one
>> parallel instance of the sources. How are you writing your data to Kafka?
>> Flink/Beam should read from all partitions if all of them contain data.
>> Could you please verify that all Kafka partitions contain data by looking at
>> the metrics of your Kafka cluster, that would be a first step towards
>> finding out where the problem lies.
>> By the way, your code uses Beam in a highly non-idiomatic way. Interacting
>> with an outside database, such as Redis, will always be the bottleneck in
>> such a job. Flink provides an abstraction for dealing with state that is
>> vastly superior to using an external system. We recently did a blog post
>> about rewriting a similar streaming use case using Flink's internal state:
>> http://data-artisans.com/extending-the-yahoo-streaming-benchmark/, maybe
>> that's interesting for you.
>> On Sat, 17 Sep 2016 at 19:30 Amir Bahmanyari <amirto...@yahoo.com> wrote:
>> Thanks so much Aljoscha
>> Is there an example that shows how to read from multiple files accurately or
>> from KafkaIO and get perfect parallelism pls?
>> Have a great weekend
>> Sent from my iPhone
>>> On Sep 17, 2016, at 5:39 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>>> One observation here is that you're only reading from one file. This will
>>> mean that you won't get any parallelism. Everything is executed on just one
>>> On Thu, 15 Sep 2016 at 01:24 amir bahmanyari <amirto...@yahoo.com> wrote:
>>> Hi Aljoscha,
>>> Experimenting on relatively smaller file , everything fixed except
>>> KafkaIO() vs. TextIO(), I get 50% better runtime performance in the Flink
>>> Cluster when reading tuples by TextIO().
>>> I understand the NW involvement in reading from Kafka topic etc., but 50%
>>> is significant.
>>> Also, I experimented 64 partitions in Kafka topic vs. 400. I get exact same
>>> performance & increasing the topic partitions doesnt improve anything.
>>> I thought some of the 64 slots may get multiple-over- parallelism really
>>> pushing it to its limit. 64 kafka topic partitions & 400 kafka topic
>>> partitions while #slots=64 is the same.
>>> Its still slow for a relatively large file though.
>>> Pls advice if something I can try to improve the cluster performance.
>>> From: Aljoscha Krettek <aljos...@apache.org>
>>> To: firstname.lastname@example.org; amir bahmanyari <amirto...@yahoo.com>
>>> Sent: Wednesday, September 14, 2016 1:48 AM
>>> Subject: Re: Fw: Flink Cluster Load Distribution Question
>>> this is a different job from the Kafka Job that you have running, right?
>>> Could you maybe post the code for that as well?
>>> On Tue, 13 Sep 2016 at 20:14 amir bahmanyari <amirto...@yahoo.com> wrote:
>>> Hi Robert,
>>> Sure, I am forwarding it to user. Sorry about that. I followed the
>>> "robot's" instructions :))
>>> Topology: 4 Azure A11 CentOS 7 nodes (16 cores, 110 GB). Lets call them
>>> node1, 2, 3, 4.
>>> Flink Clustered with node1 running JM & a TM. Three more TM's running on
>>> node2,3, and 4 respectively.
>>> I have a Beam running FLink Runner underneath.
>>> The input data is received by Beam TextIO() reading off a 1.6 GB of data
>>> containing roughly 22 million tuples.
>>> All nodes have identical flink-conf.yaml, masters & slaves contents as
>>> jobmanager.rpc.address: node1
>>> jobmanager.rpc.port: 6123
>>> jobmanager.heap.mb: 1024
>>> taskmanager.heap.mb: 102400
>>> taskmanager.numberOfTaskSlots: 16
>>> taskmanager.memory.preallocate: false
>>> parallelism.default: 64
>>> jobmanager.web.port: 8081
>>> taskmanager.network.numberOfBuffers: 4096
>>> Everything looks normal at ./start-cluster.sh & all daemons start on all
>>> JM, TMs log files get generated on all nodes.
>>> Dashboard shows how all slots are being used.
>>> I deploy the Beam app to the cluster where JM is running at node1.
>>> a *.out file gets generated as data is being processed. No *.out on other
>>> nodes, just node1 where I deployed the fat jar.
>>> I tail -f the *.out log on node1 (master). starts fine...but slowly
>>> degrades & becomes extremely slow.
>>> As we speak, I started the Beam app 13 hrs ago and its still running.
>>> How can I prove that ALL NODES are involved in processing the data at the
>>> same time i.e. clustered?
>>> Do the above configurations look ok for a reasonable performance?
>>> Given above parameters set, how can I improve the performance in this
>>> What other information and or dashboard screen shots is needed to clarify
>>> this issue.
>>> I used these websites to do the configuration:
>>> Apache Flink: Cluster Setup
>>> Apache Flink: Cluster Setup
>>> Apache Flink: Configuration
>>> Apache Flink: Configuration
>>> In the second link, there is a config recommendation for the following but
>>> this parameter is not in the configuration file out of the box:
>>> Should I include it manually? Does it make any difference if the default
>>> value i.e.32 KB doesn't get picked up?
>>> Sorry too many questions.
>>> Pls let me know.
>>> I appreciate your help.
>>> ----- Forwarded Message -----
>>> From: Robert Metzger <rmetz...@apache.org>
>>> To: "d...@flink.apache.org" <d...@flink.apache.org>; amir bahmanyari
>>> Sent: Tuesday, September 13, 2016 1:15 AM
>>> Subject: Re: Flink Cluster Load Distribution Question
>>> Hi Amir,
>>> I would recommend to post such questions to the user@flink mailing list in
>>> the future. This list is meant for development-related topics.
>>> I think we need more details to understand why your application is not
>>> running properly. Can you quickly describe what your topology is doing?
>>> Are you setting the parallelism to a value >= 1 ?
>>> On Tue, Sep 13, 2016 at 6:35 AM, amir bahmanyari <
>>> amirto...@yahoo.com.invalid> wrote:
>>> > Hi Colleagues,Just joined this forum.I have done everything possible to
>>> > get a 4 nodes Flink cluster to work peoperly & run a Beam app.It always
>>> > generates system-output logs (*.out) in only one node. Its sooooooooo slow
>>> > for 4 nodes being there.Seems like the load is not distributed amongst all
>>> > 4 nodes but only one node. Most of the time the one where JM runs.I
>>> > run/tested it in a single node, and it took even faster to run the same
>>> > load.Not sure whats not being configured right.1- why am I getting
>>> > SystemOut .out log in only one server? All nodes get their TaskManager log
>>> > files updated thu.2- why dont I see load being distributed amongst all 4
>>> > nodes, but only one all the times.3- Why does the Dashboard show a 0
>>> > (zero)
>>> > for Send/Receive numbers per all Task Managers.
>>> > The Dashboard shows all the right stuff. Top shows not much of resources
>>> > being stressed on any of the nodes.I can share its contents if it helps
>>> > diagnosing the issue.Thanks + I appreciate your valuable time, response &
>>> > help.Amir-