This is a known issue that's will be fixed in 1.9.2/1.10.0; see
On 15/01/2020 10:07, HuWeihua wrote:
We encountered some problems during the upgrade from Flink 1.5 to
Flink 1.9. Flink's scheduling strategy has changed. Flink 1.9 prefers
centralized scheduling, while Flink 1.5 prefers decentralized
scheduling. This change has caused resources imbalance and blocked our
upgrade plan. We have thousands of jobs that need to be upgraded.
There is a job with 10 sources and 100 sinks. Each source need 1 core
and each sink need 0.1 core.
Try to run this job on Yarn, configure the numberOfTaskSlots is 10,
yarn.containers.vcores is 2.
When using Flink-1.5:
Each TaskManager will run 1 source and 9 sinks, they need 1.9 cores
totally. So the job with this configuration works very well.
The schedule results is shown in Figure 1.
When using Flink-1.9:
The 10 sources will be scheduled to one TaskManager and the 100 sinks
will scheduled to other 10 TaskManagers. The schedule results is shown
in Figure 2.
In this scenario, the TaskManager which run sources need 10 cores,
other TaskManagers need 1 cores. But TaskManager must be configured
the same, So we need 11 TaskManager with 10 cores.
This situation waste (10-2)*11 = 88cores more than Flink 1.5.
In addition to the waste of resources, we also encountered other
problems caused by centralized scheduling strategy.
1. Network bandwidth. Tasks of the same type are scheduled to the one
TaskManager, causing too much network traffic on the machine.
2. Some jobs need to sink to the local agent. After centralized
scheduling, the insufficient processing capacity of the single
machine causes a backlog of consumption.
In summary, we think a decentralized scheduling strategy is necessary.
Figure 1. Flink 1.5 schedule results
Figure 2. Flink 1.9 schedule results