Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/17459#discussion_r118818160
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
---
@@ -108,8 +108,69 @@ class IndexedRowMatrix @Since("1.0.0") (
*/
@Since("1.3.0")
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
- // TODO: This implementation may be optimized
- toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
+ require(rowsPerBlock > 0,
+ s"rowsPerBlock needs to be greater than 0. rowsPerBlock:
$rowsPerBlock")
+ require(colsPerBlock > 0,
+ s"colsPerBlock needs to be greater than 0. colsPerBlock:
$colsPerBlock")
+
+ // Since block matrices require an integer row index
+ require(numRows() / rowsPerBlock.toDouble <= Int.MaxValue,
+ "Number of rows divided by rowsPerBlock cannot exceed maximum
integer.")
+
+ val m = numRows()
+ val n = numCols()
+ // The remainder calculations only matter when m % rowsPerBlock != 0
or n % colsPerBlock != 0
+ val remainderRowBlockIndex = m / rowsPerBlock
+ val remainderColBlockIndex = n / colsPerBlock
+ val remainderRowBlockSize = (m % rowsPerBlock).toInt
+ val remainderColBlockSize = (n % colsPerBlock).toInt
+ val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
+ val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
+
+ val blocks = rows.flatMap { ir: IndexedRow =>
+ val blockRow = ir.index / rowsPerBlock
+ val rowInBlock = ir.index % rowsPerBlock
+
+ ir.vector match {
+ case SparseVector(size, indices, values) =>
+ indices.zip(values).map { case (index, value) =>
+ val blockColumn = index / colsPerBlock
+ val columnInBlock = index % colsPerBlock
+ ((blockRow.toInt, blockColumn.toInt), (rowInBlock.toInt,
Array((value, columnInBlock))))
+ }
+ case DenseVector(values) =>
+ values.grouped(colsPerBlock)
+ .zipWithIndex
+ .map { case (values, blockColumn) =>
+ ((blockRow.toInt, blockColumn), (rowInBlock.toInt,
values.zipWithIndex))
+ }
+ }
+ }.groupByKey(GridPartitioner(numRowBlocks, numColBlocks,
rows.getNumPartitions)).map {
+ case ((blockRow, blockColumn), itr) =>
+ val actualNumRows =
+ if (blockRow == remainderRowBlockIndex) remainderRowBlockSize
else rowsPerBlock
+ val actualNumColumns =
+ if (blockColumn == remainderColBlockIndex) remainderColBlockSize
else colsPerBlock
+
+ val arraySize = actualNumRows * actualNumColumns
+ val matrixAsArray = new Array[Double](arraySize)
+ var countForValues = 0
+ itr.foreach { case (rowWithinBlock, valuesWithColumns) =>
+ valuesWithColumns.foreach { case (value, columnWithinBlock) =>
+ matrixAsArray.update(columnWithinBlock * actualNumRows +
rowWithinBlock, value)
+ countForValues += 1
+ }
+ }
+ val denseMatrix = new DenseMatrix(actualNumRows, actualNumColumns,
matrixAsArray)
+ val finalMatrix = if (countForValues / arraySize.toDouble >= 0.1) {
+ denseMatrix
+ } else {
+ denseMatrix.toSparse
+ }
+
+ ((blockRow, blockColumn), finalMatrix)
+ }
+ new BlockMatrix(blocks, rowsPerBlock, colsPerBlock, this.numRows(),
this.numCols())
--- End diff --
Nit: can the last two args simply be m, n for clarity?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]