you see, the core of ALS 1.0.0 is the following code:
there should be flatMap and groupByKey when running ALS iterations , right?
but when I run als iteration, there are ONLY flatMap tasks...
do you know why?
private def updateFeatures(
products: RDD[(Int, Array[Array[Double]])],
productOutLinks: RDD[(Int, OutLinkBlock)],
userInLinks: RDD[(Int, InLinkBlock)],
partitioner: Partitioner,
rank: Int,
lambda: Double,
alpha: Double,
YtY: Option[Broadcast[DoubleMatrix]])
: RDD[(Int, Array[Array[Double]])] =
{
val numBlocks = products.partitions.size
productOutLinks.join(products).flatMap { case (bid, (outLinkBlock,
factors)) =>
val toSend = Array.fill(numBlocks)(new ArrayBuffer[Array[Double]])
for (p <- 0 until outLinkBlock.elementIds.length; userBlock <- 0 until
numBlocks) {
if (outLinkBlock.shouldSend(p)(userBlock)) {
toSend(userBlock) += factors(p)
}
}
toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray))
}
}.groupByKey(new HashPartitioner(numBlocks)) //这里1.0.0 的
als代码有bug,那个版本用的是传入的partitioner,起不到作用,会导致data skew
.join(userInLinks)
.mapValues{ case (messages, inLinkBlock) =>
updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
}
}
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-types-of-tasks-when-running-ALS-iterations-tp21966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]