Github user uzadude commented on the issue:
https://github.com/apache/spark/pull/14068
Sure.
The current method for multiplying distributed block matrices starts by
deciding which block should be shuffled to which partition to do the actual
multiplications. This stage is implemented in the function called
"simulateMultiply" in BlockMatrix class.
Specifically, it iterates over all the blocks in the left matrix (lets
assume there are NxN blocks) and for each block iterates over all the blocks in
the right matrix (also NxN) to check for potential matches. This process if
obviously in the order of O(N^4). The nature of the inner iteration could be
enhanced trivially. It currently filters on pre-determend condition:
```scala
val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
val rightCounterparts = rightMatrix.filter(_._1 == colIndex)
val partitions = rightCounterparts.map(b =>
partitioner.getPartition((rowIndex, b._2)))
((rowIndex, colIndex), partitions.toSet)
}.toMap
```
more clearly this part:
```scala
val rightCounterparts = rightMatrix.filter(_._1 == colIndex)
```
So if we were to cache this check for example in a HashMap:
```scala
val rightCounterpartsHelper = rightMatrix.groupBy(_._1).map { case
(rowIndex, arr) =>
(rowIndex, arr.map(b => b._2))
}
```
We can omit the inner filter and just use it:
```scala
val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
((rowIndex, colIndex), rightCounterpartsHelper.getOrElse(colIndex,
Array()).map(b =>
partitioner.getPartition((rowIndex, b))).toSet)
}.toMap
```
And to put it al toghether:
```scala
val rightCounterpartsHelper = rightMatrix.groupBy(_._1).map { case
(rowIndex, arr) =>
(rowIndex, arr.map(b => b._2))
}
val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
((rowIndex, colIndex), rightCounterpartsHelper.getOrElse(colIndex,
Array()).map(b =>
partitioner.getPartition((rowIndex, b))).toSet)
}.toMap
```
And the same trick also for the rightDestinations.
As I mentioned above we encountered this while trying to multiply big
sparse matrices and it got stuck on the driver for a very long time (~1.5
hours) and we had to do it iteratively so the all process took many hours.
After the fix this part reduced to a few seconds.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]