GitHub user WeichenXu123 opened a pull request:

    https://github.com/apache/spark/pull/15730

    [SPARK-18218][ML][MLLib] Optimize BlockMatrix multiplication and move it 
into ml package

    ## What changes were proposed in this pull request?
    
    ### The problem in current block matrix mulitiplication
    
    As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, 
block matrix multiplication in spark may cause some problem, suppose we have 
M*N dimensions matrix A multiply N*P dimensions matrix B, when N is much larger 
than M and P, then the following problem may occur:
    - when the middle dimension N is too large, it will cause reducer OOM.
    - even if OOM do not occur, it will still cause parallism too low.
    - when N is much large than M and P, and matrix A and B have many 
partitions, it may cause too many partition on M and P dimension, it will cause 
much larger shuffled data size. (I will expain this in detail in the following.)
    
    ### Key point of my improvement
    
    In this PR, I introduce `midDimSplitNum` parameter, and improve the 
algorithm, to resolve this problem.
    
    In order to understand the improvement in this PR, first let me give a 
simple case to explain how the current mulitiplication works and what cause the 
problems above:
    
    suppose we have block matrix A, contains 200 blocks (2 rowBlockNum * 100 
colBlockNum), blocks arranged in 2 rows, 100 cols:
    ```
    A00 A01 A02 ... A0,99
    A10 A11 A12 ... A1,99
    ```
    and we have block matrix B, also contains 200 blocks (100 rowBlockNum * 2 
colBlockNum), blocks arranged in 100 rows, 2 cols:
    ```
    B00    B01
    B10    B11
    B20    B21
    ...
    B99,0  B99,1
    ```
    Suppose all blocks in the two matrices are dense for now.
    Now we call A.multiply(B), suppose the generated `resultPartitioner` 
contains 2 rowPartitions and 2 colPartitions (can't be more partitions because 
the result matrix only contains 2 * 2 blocks), the current algorithm will 
contains two shuffle steps:
    
    **step-1**
    Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, 
reducer-10, reducer-11, and shuffle data as following:
    ```
    A00 A01 A02 ... A0,99
    B00 B10 B20 ... B99,0    shuffled into reducer-00
    
    A00 A01 A02 ... A0,99
    B01 B11 B21 ... B99,1    shuffled into reducer-01
    
    A10 A11 A12 ... A1,99
    B00 B10 B20 ... B99,0    shuffled into reducer-10
    
    A10 A11 A12 ... A1,99
    B01 B11 B21 ... B99,1    shuffled into reducer-11
    ```
    
    and the shuffling above is a `cogroup` transform, note that each reducer 
contains **only one group**.
    
    **step-2**
    Step-2 will do an `aggregateByKey` transform on the result of step-1, will 
also generate 4 reducers, and generate the final result RDD, contains 4 
partitions, each partition contains one block.
    
    The main problems are in step-1. Now we have only 4 reducers, but matrix A 
and B have 400 blocks in total, obviously the reducer number is too small.
    and, we can see that, each reducer contains only one group(the group 
concept in `coGroup` transform), each group contains 200 blocks. This is 
terrible because we know that `coGroup` transformer will load each group into 
memory when computing. It is un-extensable in the algorithm level. Suppose 
matrix A has 10000 cols blocks or more instead of 100? Than each reducer will 
load 20000 blocks into memory. It will easily cause reducer OOM.
    
    This PR try to resolve the problem described above.
    When matrix A with dimension M * N multiply matrix B with dimension N * P, 
the middle dimension N is the keypoint. If N is large, the current 
mulitiplication implementation works badly.
    In this PR, I introduce a `midDimSplitNum` parameter, represent how many 
splits it will cut on the middle dimension N.
    Still using the example described above, now we set `midDimSplitNum = 10`, 
now we can generate 40 reducers in **stage-1**:
    
    the reducer-ij above now will be splited into 10 reducers: reducer-ij0, 
reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks.
    now the shuffle works as following:
    
    **reducer-000 to reducer-009**
    ```
    A0,0 A0,10 A0,20 ... A0,90
    B0,0 B10,0 B20,0 ... B90,0    shuffled into reducer-000
    
    A0,1 A0,11 A0,21 ... A0,91
    B1,0 B11,0 B21,0 ... B91,0    shuffled into reducer-001
    
    A0,2 A0,12 A0,22 ... A0,92
    B2,0 B12,0 B22,0 ... B92,0    shuffled into reducer-002
    
    ...
    
    A0,9 A0,19 A0,29 ... A0,99
    B9,0 B19,0 B29,0 ... B99,0    shuffled into reducer-009
    ```
    
    **reducer-010 to reducer-019**
    ```
    A0,0 A0,10 A0,20 ... A0,90
    B0,1 B10,1 B20,1 ... B90,1    shuffled into reducer-010
    
    A0,1 A0,11 A0,21 ... A0,91
    B1,1 B11,1 B21,1 ... B91,1    shuffled into reducer-011
    
    A0,2 A0,12 A0,22 ... A0,92
    B2,1 B12,1 B22,1 ... B92,1    shuffled into reducer-012
    
    ...
    
    A0,9 A0,19 A0,29 ... A0,99
    B9,1 B19,1 B29,1 ... B99,1    shuffled into reducer-019
    ```
    
    **reducer-100 to reducer-109** and **reducer-110 to reducer-119** is 
similar to the above, I omit to write them out.
    
    ### API for this optimized algorithm
    
    As `BlockMatrix` haven't been moved into `ml` package, I directly implement 
the `BlockMatrix` in `ml` package, with this optimization. The old 
implementation just leave it as it is.
    
    Keep all API the same with the old one, but I add a new API as following:
    ```
      def multiply(
          other: BlockMatrix,
          suggestedShuffleRowPartitions: Int,
          suggestedShuffleColPartitions: Int,
          midDimSplitNum: Int,
          suggestedResulRowPartitions: Int,
          suggestedResulColPartitions: Int
          ): BlockMatrix
    ```
    
    I add 5 extra parameters, may be here need some discuss to let the 
interface more elegant, but let me explain the parameters first:
    as expained above, the multiplication need two steps shuffle, the following 
three parameters are used for controlling step-1 shuffle:
    ```
        suggestedShuffleRowPartitions: Int  // rowPartition number on 
left-matrix on step-1 shuffle (suggested value)
        suggestedShuffleColPartitions: Int  // colPartition number on 
right-matrix on step-1 shuffle (suggested value)
        midDimSplitNum: Int                                     // middle 
dimension split number, expained above
    ```
    the following two parameters are used for result matrix partitioning(used 
in step-2 shuffle), it has no relation to the three parameters above.
    ```
        suggestedResulRowPartitions: Int        // rowPartition number on 
result matrix (suggested value)
        suggestedResulColPartitions: Int        // colPartition number on 
result matrix (suggested value)
    ```
    
    I also add other APIs of `mllib.linalg.distributed.BlockMatrix` into 
`ml.linalg.distributed.BlockMatrix`, but not contains
    `toIndexRowMatrix`, `toCoodinateMatrix`, `add`, `substract`, `blockMap`, 
after I check whether there are some optimization can be done I will add them 
in following PR.
    
    ### Shuffled data size analysis
    
    The optimization has some subtle influence on the total shuffled data size. 
Appropriate `midDimSplitNum` will significantly reduce the shuffled data size,
    but too large `midDimSplitNum` may increase the shuffled data in reverse. 
For now I don't want to introduce formula to make thing too complex, I only use 
a simple case to represent it here:
    
    Suppose we have two same size square matrices X (20 rowBlockNum * 20 
colBlockNum) and Y (20 rowBlockNum * 20 colBlockNum), **X and Y both 
partitioned in 20 rowPartitions and 20 colPartitions**.
    when we call old mulitiplication implementation in `mllib`, it will 
generate result matrix Z (20 rowBlockNum * 20 colBlockNum),
    **the two step shuffle both have 20 * 20 = 400 reducers, so the parallism = 
20 * 20 = 400, the result matrix will be partitioned into 20 * 20 = 400 
partitions**.
    
    now we can calculate the shuffled data size(I use block as the data size 
unit), and I also compute the step-1 shuffling reducer memory requirement, 
because it may require a large memory and it should be an important factor for 
optimization(for simplicity, I do not consider map-side aggregation memory 
usage for now, and currently each reducer will contains only one group, the 
step-1 reducer memory requirement approximate equals all blocks size in this 
group):
    **step-1 shuffled data** = 20 * 20 * (20 + 20) = 16000 (blocks)
    **step-2 shuffled data** = 20 * 20 = 400 (blocks) (Note that 
`aggregateByKey` contains map-side aggregation, we only need to compute the 
actually shuffled data after map-side aggregation)
    **step-1 reducer memory requirement** ~= (1 + 1) * 20 = 40 (blocks)
    
    Using optimized implementation, **in order to do a fair comparison, I let 
the parallism and result partitioner to be the same.**
    Under such restriction, I set `midDimSplitNum = 4, 
suggestedShuffleRowPartitions = 10, suggestedShuffleColPartitions = 10, 
suggestedResulRowPartitions = 20, suggestedResulColPartitions = 20`. Under such 
setting, the two shuffle steps will all have 400 parallism and the result 
matrix will also have 20 * 20 = 400 partitions.
    
    now we can calculate the shuffled data size under optimized implementation:
    **step-1 shuffled data** = 10 * 10 * (2 * 20 + 2 * 20) = 8000 (blocks)
    **step-2 shuffled data** = 10 * 10 * (2 * 2) * 4 = 1600 (blocks)
    **step-1 reducer memory requirement** ~= (2 + 2) * (20 / 4) = 20 (blocks)
    
    Now I change the parameters as `midDimSplitNum = 8, 
suggestedShuffleRowPartitions = 5, suggestedShuffleColPartitions = 10, 
suggestedResulRowPartitions = 20, suggestedResulColPartitions = 20`, it will 
still keep `parallism` and the result partitions the same:
    **step-1 shuffled data** = 5 * 10 * (4 * 20 + 2 * 20) = 6000 (blocks)
    **step-2 shuffled data** = 5 * 10 * (4 * 2) * 8 = 3200 (blocks)
    **step-1 reducer memory requirement** ~= (4 + 2) * (20 / 8) = 15 (blocks)
    
    Now I change the parameters as `midDimSplitNum = 16, 
suggestedShuffleRowPartitions = 5, suggestedShuffleColPartitions = 5, 
suggestedResulRowPartitions = 20, suggestedResulColPartitions = 20`, it will 
still keep `parallism` and the result partitions the same:
    **step-1 shuffled data** = 5 * 5 * (4 * 20 + 4 * 20) = 4000 (blocks)
    **step-2 shuffled data** = 5 * 5 * (4 * 4) * 16 = 6400 (blocks)
    **step-1 reducer memory requirement** ~= (4 + 4) * 20 / 16 = 10 (blocks)
    
    Now I make a table to show that how different parameters influence the 
shuffled data size, **Note that the comparison have the same restriction that 
`parallism = 400` and result matrix must be partitioned into 20 rows * 20 cols 
partitions.**, when `midDimSplitNum == 1` it is the original implementation in 
`mllib`.
    
    | midDimSplitNum | step-1 shuffle size | step-2 shuffle size | total 
shuffle size | step-1 reducer memory | 
    | - | - | - | - | - |
    | 1(original) | 16000 blocks | 400 blocks | 16400 blocks | 40 blocks |
    | 4 | 8000 blocks | 1600 blocks | 9600 blocks | 20 blocks |
    | 8 | 6000 blocks | 3200 blocks | 9200 blocks | 10 blocks |
    | 16 | 4000 blocks | 6400 blocks | 10400 blocks | 10 blocks |
    
    Now from the table we can found, under the same restriction **`parallism = 
400` and result matrix must be partitioned into 20 rows * 20 cols partitions**, 
when we increase the `midDimSplitNum`, **step-1 shuffle size decrease, step-2 
shuffle size increase, and step-1 reducer memory requirement decrease**.
    And we can find that, when `midDimSplitNum = 8`, the shuffle data size is 
only **about 56% of the original algorithm**, it is a huge improvement.
    
    ### Performance test
    
    Will be added ASAP.
    
    ## How was this patch tested?
    
    Test suites added.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/WeichenXu123/spark optim_block_matrix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/15730.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #15730
    
----
commit a6bc12de5e724bddc400b93db86b18b2f706cd79
Author: WeichenXu <[email protected]>
Date:   2016-11-01T02:00:39Z

    update.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to