Zhanghao Chen created FLINK-33123:
-------------------------------------

             Summary: Wrong dynamic replacement of partitioner from FORWARD to 
REBLANCE for autoscaler and adaptive scheduler  and 
                 Key: FLINK-33123
                 URL: https://issues.apache.org/jira/browse/FLINK-33123
             Project: Flink
          Issue Type: Bug
          Components: Autoscaler, Runtime / Coordination
    Affects Versions: 1.17.0, 1.18.0
            Reporter: Zhanghao Chen


*Background*

https://issues.apache.org/jira/browse/FLINK-30213 reported that the edge is 
wrong when the parallelism is changed for a vertex with a FORWARD edge, which 
is used by both the autoscaler and adaptive scheduler where one can change the 
vertex parallelism dynamically. Fix is applied to dynamically replace 
partitioner from FORWARD to REBLANCE on task deployment in {{{}StreamTask{}}}: 

    {{private static void 
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(}}
{{            Environment environment, NonChainedOutput streamOutput) {}}
{{            Environment environment, NonChainedOutput streamOutput, int 
outputIndex) {}}
{{        if (streamOutput.getPartitioner() instanceof ForwardPartitioner}}
{{                && streamOutput.getConsumerParallelism()}}
{{                && 
environment.getWriter(outputIndex).getNumberOfSubpartitions()}}
{{                        != 
environment.getTaskInfo().getNumberOfParallelSubtasks()) {}}
{{            LOG.debug(}}
{{                    "Replacing forward partitioner with rebalance for {}",}}
{{                    environment.getTaskInfo().getTaskNameWithSubtasks());}}
{{            streamOutput.setPartitioner(new RebalancePartitioner<>());}}
{{        }}}
{{    }}}

*Problem*

Unfortunately, the fix is still buggy in two aspects:
 # The connections between upstream and downstream tasks are determined by the 
distribution type of the partitioner when generating execution graph on the JM 
side. When the edge is FORWARD, the distribution type is POINTWISE, and Flink 
will try to evenly distribute subpartitions to all downstream tasks. If one 
want to change it to REBALANCE, the distribution type has to be changed to 
ALL_TO_ALL to make all-to-all connections between upstream and downstream 
tasks. However, the fix did not change the distribution type which makes the 
network connections be set up in a wrong way.
 # The FOWARD partitioner will be replaced if 
environment.getWriter(outputIndex).getNumberOfSubpartitions() equals to the 
task parallelism. However, the number of subpartitions here equals to the 
number of downstream tasks of this particular task, which is also determined by 
the distribution type of the partitioner when generating execution graph on the 
JM side.  When ceil(downstream task parallelism / upstream task parallelism) = 
upstream task parallelism, we will have the number of subpartitions = task 
parallelism. In fact, for a normal job with a FORWARD edge without any 
autoscaling action, you will find that the partitioner is changed to REBALANCE 
internally as the number of subpartitions always equals to 1 in this case.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to