Hi Stahlman,

finalRDDStorageLevel is the storage level for the final user/item
factors. It is not common to set it to StorageLevel.NONE, unless you
want to save the factors directly to disk. So if it is NONE, we cannot
unpersist the intermediate RDDs (in/out blocks) because the final
user/item factors returned are not materialized. Otherwise, we have to
recompute from the very beginning (or last checkpoint) when you
materialize the final user/item factors. If you need want to have
multiple runs, you can try to set finalRDDStorageLevel to
MEMORY_AND_DISK, or clean previous runs so the cached RDDs get garbage
collected.

Best,
Xiangrui

On Wed, Jul 22, 2015 at 11:35 AM, Ganelin, Ilya
<ilya.gane...@capitalone.com> wrote:
> To be Unpersisted the RDD must be persisted first. If it's set to None, then
> it's not persisted, and as such does not need to be freed. Does that make
> sense ?
>
>
>
> Thank you,
> Ilya Ganelin
>
>
>
>
> -----Original Message-----
> From: Stahlman, Jonathan [jonathan.stahl...@capitalone.com]
> Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
> To: user@spark.apache.org
> Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello again,
>
> In trying to understand the caching of intermediate RDDs by ALS, I looked
> into the source code and found what may be a bug.  Looking here:
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230
>
> you see that ALS.train() is being called with finalRDDStorageLevel =
> StorageLevel.NONE, which I would understand to mean that the intermediate
> RDDs will not be persisted.  Looking here:
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631
>
> unpersist() is only being called on the intermediate RDDs (all the *Blocks
> RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.
>
> This doesn’t make sense to me – I would expect the RDDs to be removed from
> the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way
> around.
>
> Jonathan
>
>
> From: <Stahlman>, Stahlman Jonathan <jonathan.stahl...@capitalone.com>
> Date: Thursday, July 16, 2015 at 2:18 PM
> To: "user@spark.apache.org" <user@spark.apache.org>
> Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
>
> Hello all,
>
> I am running the Spark recommendation algorithm in MLlib and I have been
> studying its output with various model configurations.  Ideally I would like
> to be able to run one job that trains the recommendation model with many
> different configurations to try to optimize for performance.  A sample code
> in python is copied below.
>
> The issue I have is that each new model which is trained caches a set of
> RDDs and eventually the executors run out of memory.  Is there any way in
> Pyspark to unpersist() these RDDs after each iteration?  The names of the
> RDDs which I gather from the UI is:
>
> itemInBlocks
> itemOutBlocks
> Products
> ratingBlocks
> userInBlocks
> userOutBlocks
> users
>
> I am using Spark 1.3.  Thank you for any help!
>
> Regards,
> Jonathan
>
>
>
>
>   data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
>   functions = [rating] #defined elsewhere
>   ranks = [10,20]
>   iterations = [10,20]
>   lambdas = [0.01,0.1]
>   alphas  = [1.0,50.0]
>
>   results = []
>   for ratingFunction, rank, numIterations, m_lambda, m_alpha in
> itertools.product( functions, ranks, iterations, lambdas, alphas ):
>     #train model
>     ratings_train = data_train.map(lambda l: Rating( l.user, l.product,
> ratingFunction(l) ) )
>     model   = ALS.trainImplicit( ratings_train, rank, numIterations,
> lambda_=float(m_lambda), alpha=float(m_alpha) )
>
>     #test performance on CV data
>     ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product,
> ratingFunction(l) ) )
>     auc = areaUnderCurve( ratings_cv, model.predictAll )
>
>     #save results
>     result = ",".join(str(l) for l in
> [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
>     results.append(result)
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.
>
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to