Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/17459#discussion_r111072525
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/IndexedRowMatrix.scala
---
@@ -108,8 +108,64 @@ class IndexedRowMatrix @Since("1.0.0") (
*/
@Since("1.3.0")
def toBlockMatrix(rowsPerBlock: Int, colsPerBlock: Int): BlockMatrix = {
- // TODO: This implementation may be optimized
- toCoordinateMatrix().toBlockMatrix(rowsPerBlock, colsPerBlock)
+ require(rowsPerBlock > 0,
+ s"rowsPerBlock needs to be greater than 0. rowsPerBlock:
$rowsPerBlock")
+ require(colsPerBlock > 0,
+ s"colsPerBlock needs to be greater than 0. colsPerBlock:
$colsPerBlock")
+
+ val m = numRows()
+ val n = numCols()
+ val lastRowBlockIndex = m / rowsPerBlock
+ val lastColBlockIndex = n / colsPerBlock
+ val lastRowBlockSize = (m % rowsPerBlock).toInt
+ val lastColBlockSize = (n % colsPerBlock).toInt
+ val numRowBlocks = math.ceil(m.toDouble / rowsPerBlock).toInt
+ val numColBlocks = math.ceil(n.toDouble / colsPerBlock).toInt
+
+ val blocks = rows.flatMap { ir: IndexedRow =>
+ val blockRow = ir.index / rowsPerBlock
+ val rowInBlock = ir.index % rowsPerBlock
+
+ ir.vector match {
+ case SparseVector(size, indices, values) =>
+ indices.zip(values).map { case (index, value) =>
+ val blockColumn = index / colsPerBlock
+ val columnInBlock = index % colsPerBlock
+ ((blockRow.toInt, blockColumn.toInt), (rowInBlock.toInt,
Array((value, columnInBlock))))
+ }
+ case DenseVector(values) =>
+ values.grouped(colsPerBlock)
+ .zipWithIndex
+ .map { case (values, blockColumn) =>
+ ((blockRow.toInt, blockColumn), (rowInBlock.toInt,
values.zipWithIndex))
+ }
+ }
+ }.groupByKey(GridPartitioner(numRowBlocks, numColBlocks,
rows.getNumPartitions)).map {
+ case ((blockRow, blockColumn), itr) =>
+ val actualNumRows =
+ if (blockRow == lastRowBlockIndex) lastRowBlockSize else
rowsPerBlock
+ val actualNumColumns =
+ if (blockColumn == lastColBlockIndex) lastColBlockSize else
colsPerBlock
+
+ val arraySize = actualNumRows * actualNumColumns
+ val matrixAsArray = new Array[Double](arraySize)
+ var countForValues = 0
+ itr.foreach { case (rowWithinBlock, valuesWithColumns) =>
+ valuesWithColumns.foreach { case (value, columnWithinBlock) =>
+ matrixAsArray.update(columnWithinBlock * actualNumRows +
rowWithinBlock, value)
+ countForValues += 1
+ }
+ }
+ val denseMatrix = new DenseMatrix(actualNumRows, actualNumColumns,
matrixAsArray)
+ val finalMatrix = if (countForValues / arraySize.toDouble >= 0.5) {
--- End diff --
I am not sure if 0.5 is a proper value practically. Maybe others have more
ideas about it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]