Github user yu-iskw commented on a diff in the pull request:
https://github.com/apache/spark/pull/8563#discussion_r43610490
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
---
@@ -402,4 +441,474 @@ class BlockMatrix @Since("1.3.0") (
s"A.colsPerBlock: $colsPerBlock, B.rowsPerBlock:
${other.rowsPerBlock}")
}
}
+
+ /**
+ * Schur Complement of a BlockMatrix. For a matrix that is in 4
partitions:
+ * A=[a11, a12; a21; a22], the Schur Complement S is S = a22 - (a21 *
a11^-1 * a12).
+ * The Schur Complement is always (n-1) x (n-1), which is the size of
a22. a11 is expected
+ * to fit into memory so that Breeze inversions can be computed.
+ *
+ * @return BlockMatrix Schur Complement as BlockMatrix
+ * @since 1.6.0
+ */
+ private[mllib] def SchurComplement: BlockMatrix = {
+ require(this.numRowBlocks == this.numColBlocks, "Block Matrix must be
square.")
+ require(this.numRowBlocks > 1, "Block Matrix must be larger than one
block.")
+ val topRange = (0, 0); val botRange = (1, this.numColBlocks - 1)
+ val a11 = this.subBlock(topRange, topRange)
+ val a12 = this.subBlock(topRange, botRange)
+ val a21 = this.subBlock(botRange, topRange)
+ val a22 = this.subBlock(botRange, botRange)
+
+ val a11Brz = inv(a11.toBreeze) // note that intermediate a11 calcs
derive from inv(a11)
+ val a11Mtx = Matrices.dense(a11.numRows.toInt, a11.numCols.toInt,
a11Brz.toArray)
+ val a11RDD = this.blocks.sparkContext.parallelize(Seq(((0, 0),
a11Mtx)))
+ val a11Inv = new BlockMatrix(a11RDD, this.rowsPerBlock,
this.colsPerBlock)
+
+ val S = a22.subtract(a21.multiply(a11Inv.multiply(a12)))
+ S
+ }
+
+ /**
+ * Returns a rectangular (sub)BlockMatrix with block ranges as
specified. Block Ranges
+ * refer to a range of blocks that each contain a matrix. The returned
BlockMatrix
+ * is numbered so that the upper left block is indexed as (0,0).
+ *
+ *
+ * @param blockRowRange The lower and upper row range of blocks, as
(Int,Int)
+ * @param blockColRange The lower and upper col range of blocks, as
(Int, Int)
+ * @return a BlockMatrix with (0,0) as the upper leftmost block index
+ * @since 1.6.0
+ */
+ private [mllib] def subBlock(blockRowRange: (Int, Int), blockColRange:
(Int, Int)):
+ BlockMatrix = {
+ // Extracts BlockMatrix elements from a specified range of block
indices
+ // Creating a Sub BlockMatrix of rectangular shape.
+ // Also reindexes so that the upper left block is always (0, 0)
+
+ // TODO: add a require statement ...rowMax<=size..
+ val rowMin = blockRowRange._1; val rowMax = blockRowRange._2
+ val colMin = blockColRange._1 ; val colMax = blockColRange._2
+ val extractedSeq = this.blocks.filter{ case((x, y), matrix) =>
+ x >= rowMin && x<= rowMax && // finding blocks
+ y >= colMin && y<= colMax }.map { // shifting indices
+ case(((x, y), matrix) ) => ((x-rowMin, y-colMin), matrix)
+ }
+ new BlockMatrix(extractedSeq, rowsPerBlock, colsPerBlock)
+ }
+
+ /**
+ * Computes the LU decomposition of a Single Block from BlockMatrix
using the
+ * Breeze LU method. The method (as written) operates -only- on the
upper
+ * left (0,0) corner of the BlockMatrix.
+ *
+ * @return List[BDM[Double]] of Breeze Matrices (BDM) (P,L,U) for
blockLU method.
+ * @since 1.6.0
+ */
+ private [mllib] def singleBlockPLU: List[BDM[Double]] = {
+ // returns PA = LU factorization from Breeze
+ val PLU = LU(this.subBlock((0, 0), (0, 0)).toBreeze)
+ val k = PLU._1.cols
+ val L = lowerTriangular(PLU._1) - diag(diag(PLU._1)) +
diag(DenseVector.fill(k){1.0})
+ val U = upperTriangular(PLU._1);
+ var P = diag(DenseVector.fill(k){1.0})
+ var Pi = diag(DenseVector.fill(k){1.0})
+ // size of square matrix
+ // populating permutation matrix
+ var i = 0
+ while (i < k) {
+ val I = {
+ if (i == 0){k - 1}
+ else {i - 1}
+ }
+ val J = PLU._2(i) -1
+ if (i != J) {
+ Pi(i, J) += 1.0
+ Pi(J, i) += 1.0
+ Pi(i, i) -= 1.0
+ Pi(J, J) -= 1.0
+ }
+ P = Pi * P // constructor Pi*P for PA=LU
+ // resetting Pi for next iteration
+ if (i != J) {
+ Pi(i, J) -= 1.0
+ Pi(J, i) -= 1.0
+ Pi(i, i) += 1.0
+ Pi(J, J) += 1.0
+ }
+ i += 1
+ }
+ List(P, L, U)
+ }
+
+
+ /**
+ * This method reassigns 'absolute' index locations (i,j), to sequences.
This is
+ * designed to reconsitute the orignal block locations that were lost in
the
+ * subBlock method.
+ *
+ * @param rowMin The new lowest row value
+ * @param colMin The new lowest column value
+ * @return an RDD of Sequences with new block indexing
+ * @since 1.6.0
+ *
+ */
+ private [mllib] def shiftIndices(rowMin: Int, colMin: Int): RDD[((Int,
Int), Matrix)] = {
+ // This routine recovers the absolute indexing of the block matrices
for reassembly
+ val extractedSeq = this.blocks.map { // shifting indices
+ case(((x, y), matrix)) => ((x + rowMin, y + colMin), matrix)
+ }
+ extractedSeq
+ }
+
+ /**
+ * A class that contains the 3 main BlockMatrix items to be returned
+ * when calling blockLU.
+ *
+ * @param p The Permutation BlockMatrix
+ * @param l Lower Diagonal BlockMatrix
+ * @param u Upper Diagonal BlockMatrix
+ *
+ */
+
+ private [mllib] class PLU(p: BlockMatrix, l: BlockMatrix, u: BlockMatrix
){
+ val P = p
+ val L = l
+ val U = u
+ }
+
+ /**
+ * Extends the base class PLU with additional matrices that are used
+ * int he solve method.
+ *
+ * @param p The Permutation BlockMatrix
+ * @param l Lower Diagonal BlockMatrix
+ * @param u Upper Diagonal BlockMatrix
+ *
+ * @param lInv The inverse of the lower diagaonal matrices (in (i,i)th
+ * cells only).
+ * @param uInv The inverse of the upper diagaonal matrices (in (i,i)th
+ * cells only).
+ */
+ private [mllib] class PLUandInverses(p: BlockMatrix, l: BlockMatrix, u:
BlockMatrix,
+
+ lInv: BlockMatrix, uInv: BlockMatrix) extends
PLU(p, l, u) {
+ val dLi = lInv
+ val dUi = uInv
+ }
+
+ /**
+ * Computes the LU Decomposition of a Square Matrix. For a matrix A of
size (n x n)
+ * LU decomposition computes the Lower Triangular Matrix L, the Upper
Triangular
+ * Matrix U, along with a Permutation Matrix P, such that PA=LU. The
Permutation
+ * Matrix addresses cases where zero entries prevent forward substitution
+ * solution of L or U.
+ *
+ * The BlockMatrix version takes a BlockMatrix as an input and returns a
Tuple
+ * of 5 BlockMatrix objects:
+ * P, L, U (in that order), such that P.multiply(A)-L.multiply(U) = 0
+ * and Li, Ui, which are the inverse of the block diagonal terms for L
and U.
+ *
+ * The blockLU method will return only P,L, and U, but blockLUtoSolver
will return
+ * the extra Li and Ui matrices, which will be used by the solve method
+ * so that it does not need to recompute these values.
+ *
+ * The method follows a procedure similar to the method used in
ScaLAPACK, but
+ * places more emphasis on preparing BlockMatrix objects as inputs to
large
+ * BlockMatrix.multiply operations.
+ *
+ *
+ * @return PLUandInverses(P,L,U,Li,Ui) as a Tuple of BlockMatrix
+ * @since 1.6.0
+ */
+ private [mllib] def blockLUtoSolver: PLUandInverses = {
+
+ // builds up the array as a union of RDD sets
+ val nDiagBlocks = this.numColBlocks
+ // Matrix changes shape during recursion...the "absolute location"
must be
+ // preserved when reconstructing.
+ val rowsAbs = this.numRowBlocks; val colsAbs = rowsAbs
+ // accessing the spark context
+ val sc = this.blocks.sparkContext
+
+ /**
+ * LUSequences is a class that is defined to make the
recursiveSequencesBuild section
+ * more readable.
+ *
+ * These are passed as an RDD of blocks:
+ * @param p the permutation matrix.
+ * @param l the lower diagonal matrix.
+ * @param u the upper diagonal matrix.
+ * @param lInv the inverse lower diagonal matrix (only populating
(i,i) cells).
+ * @param uInv the inverse upper diagonal matrix (only populating
(i,i) cells).
+ * @param lDiag the lower diagonal matrices (only populating (i,i)
cells).
+ * @param uDiag the upper diagonal matrices (only populating (i,i)
cells).
+ * This is passed as a BlockMatrix
+ * @param a the Schur Complement from the previous iteration, treated
as the source matrix
+ * for the next iteraton.
+ *
+ *
+ * @Since("1.6.0")
+ */
+ class LUSequences(p: RDD[((Int, Int), Matrix)], l: RDD[((Int, Int),
Matrix)],
+ u: RDD[((Int, Int), Matrix)],
+ lInv: RDD[((Int, Int), Matrix)], uInv: RDD[((Int,
Int), Matrix)],
+ lDiag: RDD[((Int, Int), Matrix)], uDiag: RDD[((Int,
Int), Matrix)],
+ a: BlockMatrix) {
+ val P = p // the permutation matrix.
+ val L = l // the lower diagonal matrix.
+ val U = u // the upper diagonal matrix.
+ val Li = lInv // the inverse lower diagonal matrix (only populating
(i,i) cells).
+ val Ui = uInv // the inverse upper diagonal matrix (only populating
(i,i) cells).
+ val LD = lDiag // the lower diagonal matrices (only populating (i,i)
cells).
+ val UD = uDiag // he upper diagonal matrices (only populating (i,i)
cells).
+ val A = a // the Schur Complement: BlockMatrix from the previous
iteration, treated
+ // as the source matrix for the next iteraton.
+ }
+
+ /**
+ * Recursive Sequence Build is a nested recursion method that builds
up all of the
+ * sequences that are converted to BlockMatrix classes for large matrix
+ * multiplication operations. The Schur Complement is calculated at
each
+ * recursion step and fed into the next iteration as the input matrix.
+ *
+ * dP, dL, dU, dLi, dUi have the solutions to LU(S) in the (i,i)
(diagonal) blocks.
+ * dLi and dUi, are the inverses of each block in dL and dU. UD and
LD contain the
+ * extracted subBlocks from the incoming matrix (which is the Schur
Complement
+ * from the previous iteration). These matrices occupy the U12 and
L21 spaces at
+ * each iteration, and form the strict upper and lower block diagonal
matrices,
+ * respectively. This means that for UD, only (i,j>i) blocks are
populated with
+ * the cascading Schur calculations, while for LD, (i, j<i) blocks are
populated.
+ *
+ * @param rowI
+ * @param prev
+ * @return dP, dL, dU, dLi, dUi, LD, UD, S All are RDDs of Sequences
that are
+ * iteratively built, while S is a BlockMatrix used in the
recursion loop
+ * @since 1.6.0
+ */
+ def recursiveSequencesBuild(rowI: Int, prev: LUSequences): LUSequences
= {
+
+ val rowsRel = prev.A.numRowBlocks; val colsRel = prev.A.numColBlocks
--- End diff --
don't declare multiple variables in the same line.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]