[
https://issues.apache.org/jira/browse/SPARK-20486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Daniel Li updated SPARK-20486:
--
Description:
The in-block and out-block data structures in the ALS code is currently
calculated within the {{ALS.train}} method itself. I propose to move this
code, along with its helper functions, into a separate class to encapsulate the
creation of the blocks. This has the added benefit of allowing us to include a
comprehensive Scaladoc to this new class to explain in detail how this core
part of the algorithm works.
Proposal:
{code}
private[recommendation] final case class RatingBlocks[ID](
userIn: RDD[(Int, InBlock[ID])],
userOut: RDD[(Int, OutBlock)],
itemIn: RDD[(Int, InBlock[ID])],
itemOut: RDD[(Int, OutBlock)]
)
private[recommendation] object RatingBlocks {
def create[ID: ClassTag: Ordering](
ratings: RDD[Rating[ID]],
numUserBlocks: Int,
numItemBlocks: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK):
RatingBlocks[ID] = {
val userPart = new ALSPartitioner(numUserBlocks)
val itemPart = new ALSPartitioner(numItemBlocks)
val blockRatings =
partitionRatings(ratings, userPart, itemPart)
.persist(storageLevel)
val (userInBlocks, userOutBlocks) =
makeBlocks("user", blockRatings, userPart, itemPart, storageLevel)
userOutBlocks.count() // materialize `blockRatings` and user blocks
val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds,
localRatings)) =>
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds,
localRatings))
}
val (itemInBlocks, itemOutBlocks) =
makeBlocks("item", swappedBlockRatings, itemPart, userPart, storageLevel)
itemOutBlocks.count() // materialize item blocks
blockRatings.unpersist()
new RatingBlocks(userInBlocks, userOutBlocks, itemInBlocks, itemOutBlocks)
}
private[this] def partitionRatings[ID: ClassTag](...) = {
// existing code goes here verbatim
}
private[this] def makeBlocks[ID: ClassTag](...) = {
// existing code goes here verbatim
}
}
{code}
was:
The in-block and out-block data structures in the ALS code is currently
calculated within the {{ALS.train}} method itself. I propose to move this
code, along with its helper functions, into a separate class to encapsulate the
creation of the blocks. This has the added benefit of allowing us to include a
comprehensive Scaladoc to this new class to explain in detail how this core
part of the algorithm works.
Proposal:
{code}
private[recommendation] final case class RatingBlocks[ID](
userIn: RDD[(Int, InBlock[ID])],
userOut: RDD[(Int, OutBlock)],
itemIn: RDD[(Int, InBlock[ID])],
itemOut: RDD[(Int, OutBlock)]
)
private[recommendation] object RatingBlocks {
def create[ID: ClassTag: Ordering](
ratings: RDD[Rating[ID]],
numUserBlocks: Int,
numItemBlocks: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK):
RatingBlocks[ID] = {
// In-block and out-block code currently in `ALS.train` goes here
}
private[this] def partitionRatings[ID: ClassTag](...) = { ... }
private[this] def makeBlocks[ID: ClassTag](...) = { ... }
}
{code}
> Encapsulate ALS in-block and out-block data structures and methods into a
> separate class
>
>
> Key: SPARK-20486
> URL: https://issues.apache.org/jira/browse/SPARK-20486
> Project: Spark
> Issue Type: Improvement
> Components: ML, MLlib
>Affects Versions: 2.1.0
>Reporter: Daniel Li
>Priority: Trivial
>
> The in-block and out-block data structures in the ALS code is currently
> calculated within the {{ALS.train}} method itself. I propose to move this
> code, along with its helper functions, into a separate class to encapsulate
> the creation of the blocks. This has the added benefit of allowing us to
> include a comprehensive Scaladoc to this new class to explain in detail how
> this core part of the algorithm works.
> Proposal:
> {code}
> private[recommendation] final case class RatingBlocks[ID](
> userIn: RDD[(Int, InBlock[ID])],
> userOut: RDD[(Int, OutBlock)],
> itemIn: RDD[(Int, InBlock[ID])],
> itemOut: RDD[(Int, OutBlock)]
> )
> private[recommendation] object RatingBlocks {
> def create[ID: ClassTag: Ordering](
> ratings: RDD[Rating[ID]],
> numUserBlocks: Int,
> numItemBlocks: Int,
> storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK):
> RatingBlocks[ID] = {
> val userPart = new ALSPartitioner(numUserBlocks)
> val itemPart = new ALSPartitioner(numItemBlocks)
> val blockRatings =
> partitionRatings(ratings, userPart, itemPart)
> .persist(storageLevel)
> val