Hi Bharath – I’m unsure if this is your problem but the 
MatrixFactorizationModel in MLLIB which is the underlying component for ALS 
expects your User/Product fields to be integers. Specifically, the input to ALS 
is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if 
perhaps one of your identifiers exceeds MAX_INT, could you write a quick check 
for that?

I have been running a very similar use case to yours (with more constrained 
hardware resources) and I haven’t seen this exact problem but I’m sure we’ve 
seen similar issues. Please let me know if you have other questions.

From: Bharath Ravi Kumar <reachb...@gmail.com<mailto:reachb...@gmail.com>>
Date: Thursday, November 27, 2014 at 1:30 PM
To: "user@spark.apache.org<mailto:user@spark.apache.org>" 
<user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: ALS failure with size > Integer.MAX_VALUE

We're training a recommender with ALS in mllib 1.1 against a dataset of 150M 
users and 4.5K items, with the total number of training records being 1.2 
Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. 
For the training, rank=10, and we've configured {number of user data blocks = 
number of item data blocks}. The number of user/item blocks was varied  between 
50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there 
are atleast a couple of tasks that end up shuffle reading > 9.7G each in the 
aggregate stage (ALS.scala:337) and failing with the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
        at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
        at 
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
        at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)



As for the data, on the user side, the degree of a node in the connectivity 
graph is relatively small. However, on the item side, 3.8K out of the 4.5K 
items are connected to 10^5 users each on an average, with 100 items being 
connected to nearly 10^8 users. The rest of the items are connected to less 
than 10^5 users. With such a skew in the connectivity graph, I'm unsure if 
additional memory or variation in the block sizes would help (considering my 
limited understanding of the implementation in mllib). Any suggestion to 
address the problem?


The test is being run on a standalone cluster of 3 hosts, each with 100G RAM & 
24 cores dedicated to the application. The additional configs I made specific 
to the shuffle and task failure reduction are as follows:

spark.core.connection.ack.wait.timeout=600
spark.shuffle.consolidateFiles=true
spark.shuffle.manager=SORT


The job execution summary is as follows:

Active Stages:

Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 
failed), Shuffle Read :  141.6 GB

Completed Stages (5)
Stage Id    Description                                        Duration    
Tasks: Succeeded/Total    Input    Shuffle Read    Shuffle Write
6            org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min         
1200/1200                29.9 GB    1668.4 MB        186.8 GB

5    mapPartitionsWithIndex at ALS.scala:250 +details

3    map at ALS.scala:231

0    aggregate at ALS.scala:337 +details

1    map at ALS.scala:228 +details


Thanks,
Bharath
________________________________________________________

The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.

Reply via email to