Hi all,
I must say I'm very impressed by Flink and what it can do.

I was trying to play around with Flink operator parallelism and scalability
and I have few questions regarding this subject. 

My setup is:
1. Flink 1.9.1
2. Docker Job Cluster, where each Task manager has only one task slot. I'm
following [1]
3. env setup:
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
        env.setParallelism(1);
        env.setMaxParallelism(128);
        env.enableCheckpointing(10 * 60 * 1000);

Please mind that I am using operator chaining here. 

My pipeline setup:
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture2.png>
 


As you can see I have 7 operators (few of them were actually chained and
this is ok), with different parallelism level. This all gives me 23 tasks
total. 


I've noticed that with "one task manager = one task slot" approach I have to
have 6 task slots/task managers to be able to start this pipeline.

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture1.png>
 

If number of task slots is lower than 6, job is scheduled but not started. 

With 6 task slots everything is working fine and I've must say that I'm very
impressed with a way that Flinks balanced data between task slots. Data was
distributed very evenly between operator instances/tasks. 

In this setup (7 operators, 23 tasks and 6 task slots), some task slots have
to be reused by more than one operator. While inspecting UI I've found
examples such operators. This is what I was expecting though.

However I was surprised a little bit after I added one additional task
manager (hence one new task slot)

<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t2311/Capture3.png>
 

After adding new resources, Flink did not re balanced/redistributed the
graph. So this host was sitting there and doing nothing. Even after putting
some load on the cluster, still this node was not used.

 
*After doing this exercise I have few questions:*

1. It seems that number of task slots must be equal or greater than max
number of parallelism used in the pipeline. In my case it was 6. When I
changed parallelism for one of the operator to 7, I had to have 7 task slots
(task managers in my setup) to be able to even start the job. 
Is this the case?

2. What I can do to use the extra node that was spanned while job was
running?
In other words, If I would see that one of my nodes has to much load what I
can do? Please mind that I'm using keyBy/hashing function in my pipeline and
in my tests I had around 5000 unique keys.

I've try to use REST API to call "rescale" but I got this response:
/302{"errors":["Rescaling is temporarily disabled. See FLINK-12312."]}/

Thanks.

[1]
https://github.com/apache/flink/blob/release-1.9/flink-container/docker/README.md



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to