Github user feynmanliang commented on a diff in the pull request:
https://github.com/apache/spark/pull/4286#discussion_r38369418
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
---
@@ -246,4 +246,135 @@ class BlockMatrix(
val localMat = toLocalMatrix()
new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray)
}
+
+ /**
+ * Forms [[MatrixBlock]]s using `rowsPerBlock` and `colsPerBlock`, which
were provided in
+ * the constructor. Even if all [[MatrixBlock]]s (except the ones on the
right and bottom edge)
+ * have dimensions `rowsPerBlock`x`colsPerBlock`, a new [[BlockMatrix]]
will be returned.
+ */
+ def repartition(): BlockMatrix = {
+ repartition(rowsPerBlock, colsPerBlock)
+ }
+
+ /**
+ * Forms [[MatrixBlock]]s using the provided dimensions
`newRowsPerBlock`x`newColsPerBlock`
+ * Even if all [[MatrixBlock]]s (except the ones on the right and bottom
edge)
+ * have dimensions `newRowsPerBlock`x`newColsPerBlock`, a new
[[BlockMatrix]] will be returned.
+ * The resulting number of partitions may be different than the current
number of partitions.
+ *
+ * @param newRowsPerBlock Number of rows that make up each MatrixBlock.
+ * @param newColsPerBlock Number of columns that make up each
MatrixBlock.
+ * @return The repartitioned BlockMatrix
+ */
+ def repartition(newRowsPerBlock: Int, newColsPerBlock: Int): BlockMatrix
= {
+ repartition(newRowsPerBlock, newColsPerBlock, blocks.partitions.length)
+ }
+
+ /**
+ * Forms [[MatrixBlock]]s using the provided dimensions
`newRowsPerBlock`x`newColsPerBlock`
+ * Even if all [[MatrixBlock]]s (except the ones on the right and bottom
edge)
+ * have dimensions `newRowsPerBlock`x`newColsPerBlock`, a new
[[BlockMatrix]] will be returned.
+ * The resulting number of partitions may be different than
`suggestedNumPartitions`. Assumes
+ * that the offsets of each block is
+ * (`blockRowIndex`x`rowsPerBlock`, `blockColIndex`x`colsPerBlock`).
+ *
+ * If there are any [[MatrixBlock]]s with dimensions greater than
`rowsPerBlock`x`colsPerBlock`,
+ * the data outside this scope will be added on to the data of the
neighboring [[MatrixBlock]]s.
+ *
+ * @param newRowsPerBlock Number of rows that make up each block.
+ * @param newColsPerBlock Number of columns that make up each block.
+ * @param suggestedNumPartitions Number of partitions to partition the
underlying RDD in. The
+ * final number of partitions may not
equal this number.
+ * @return The repartitioned BlockMatrix
+ */
+ def repartition(
+ newRowsPerBlock: Int,
+ newColsPerBlock: Int,
+ suggestedNumPartitions: Int): BlockMatrix = {
+ require(newRowsPerBlock > 0,
+ s"newRowsPerBlock must be greater than 0. newRowsPerBlock:
$newRowsPerBlock")
+ require(newColsPerBlock > 0,
+ s"newColsPerBlock must be greater than 0. newColsPerBlock:
$newColsPerBlock")
+ require(suggestedNumPartitions > 0, s"suggestedNumPartitions must be
greater than 0. " +
+ s"suggestedNumPartitions: $suggestedNumPartitions")
+ val m = numRows()
+ val n = numCols()
+ val newNumRowBlocks = math.ceil(m * 1.0 / newRowsPerBlock).toInt
+ val newNumColBlocks = math.ceil(n * 1.0 / newColsPerBlock).toInt
+ val slicedBlocks = blocks.flatMap { case ((blockRowIndex,
blockColIndex), mat) =>
+ val rowStartOffset = blockRowIndex * rowsPerBlock
+ // val rowEndOffset = rowStartOffset + mat.numRows
+ val colStartOffset = blockColIndex * colsPerBlock
+ // val colEndOffset = colStartOffset + mat.numCols
+
+ /*
+ // The range of indices that the parts of this block are going to be
mapped to
+ val rowIndex = rowStartOffset / newRowsPerBlock
+ val endRowIndex = math.ceil(rowEndOffset * 1.0 /
newRowsPerBlock).toInt
+ val colIndex = colStartOffset / newColsPerBlock
+ val endColIndex = math.ceil(colEndOffset * 1.0 /
newColsPerBlock).toInt
+
+ // The (Int, Int) key correspond to the index in the grid that this
block now belongs to
+ // In (Int, Int, Matrix), the first Int is the row offset that the
subBlock will have
+ // in the new block it's going to be a part of. The second Int is
the column offset.
+
+
+ val subBlocks = new ArrayBuffer[((Int, Int), (Int, Int, Matrix))](
+ (endRowIndex - rowIndex) * (endColIndex - colIndex))
+
+ var colIdx = colIndex
+ while (colIdx < endColIndex) {
+ var rowIdx = rowIndex
+ while (rowIdx < endRowIndex) {
+ // The indices to slice from the matrix
+ val sliceRowStart =
+ math.max(rowStartOffset, rowIdx * newRowsPerBlock) -
rowStartOffset
+ val sliceRowEnd =
+ math.min(rowEndOffset, (rowIdx + 1) * newRowsPerBlock) -
rowStartOffset
+ val sliceColStart =
+ math.max(colStartOffset, colIdx * newColsPerBlock) -
colStartOffset
+ val sliceColEnd =
+ math.min(colEndOffset, (colIdx + 1) * newColsPerBlock) -
colStartOffset
+ // slice matrix
+ val slicedMat = mat.toBreeze(sliceRowStart until sliceRowEnd,
+ sliceColStart until sliceColEnd).toDenseMatrix
+
+ subBlocks.append(((rowIdx, colIdx), (
+ sliceRowStart + rowStartOffset - rowIdx * newRowsPerBlock,
+ sliceColStart + colStartOffset - colIdx * newColsPerBlock,
+ Matrices.fromBreeze(slicedMat))))
+ rowIdx += 1
+ }
+ colIdx += 1
+ }
+ subBlocks
+ */
+ val values = new ArrayBuffer[((Int, Int), (Int, Int, Double))]()
+ mat.foreachActive { (i, j, v) =>
+ val targetBlockRowIndex = (rowStartOffset + i) / newRowsPerBlock
+ val targetBlockColIndex = (colStartOffset + j) / newColsPerBlock
+ val targetRowOffset = (rowStartOffset + i) - newRowsPerBlock *
targetBlockRowIndex
+ val targetColOffset = (colStartOffset + j) - newColsPerBlock *
targetBlockColIndex
+ values.append(
+ ((targetBlockRowIndex, targetBlockColIndex), (targetRowOffset,
targetColOffset, v)))
+ }
+ values
+ }
+ val newPartitioner = GridPartitioner(newNumRowBlocks, newNumColBlocks,
suggestedNumPartitions)
+ val newMatrixBlocksRDD: RDD[MatrixBlock] =
slicedBlocks.groupByKey(newPartitioner).
--- End diff --
line break before "."
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]