dciborow created SPARK-23810:
--------------------------------

             Summary: Matrix Multiplication is so bad, file I/O to local python 
is better
                 Key: SPARK-23810
                 URL: https://issues.apache.org/jira/browse/SPARK-23810
             Project: Spark
          Issue Type: Bug
          Components: MLlib
    Affects Versions: 2.2.0
            Reporter: dciborow


I am trying to multiple two matrices. One is 130k by 30k. The second is 30k by 
30k.

Running this leads to hearbeat timeout, Java Heap Space and Garage collection 
errors.

{{rdd.toBlockMatrix.multiply(rightRdd.toBlockMatrix).toIndexedRowMatrix()}}

{{I have also tried the following which will fail on the toLocalMatrix call. }}

val userMatrix = new CoordinateMatrix(userRDD).toIndexedRowMatrix()
 val itemMatrix = new CoordinateMatrix(itemRDD).toBlockMatrix().toLocalMatrix()

val itemMatrixBC = session.sparkContext.broadcast(itemMatrix)
 val userToItemMatrix = userMatrix
 .multiply(itemMatrixBC.value)
 .rows.map(index => (index.index.toInt, index.vector))

 

I instead have gotten this operation "working", by saving the inputs dataframes 
to parquet(which start as DataFrames before the .rdd call to get them to work 
with the matrix types), and then loading them into python/pandas, using numpy 
for the matrix mulplication, saving back to parquet, and rereading back into 
spark.

 

Python -

import pandas as pd
import numpy as np

X = pd.read_parquet('./items-parquet', engine='pyarrow')
#Xp = np.stack(X.jaccardList)
Xp = pd.DataFrame(np.stack(X.jaccardList), X.itemID)
Xrows = pd.DataFrame(index=range(0, X.itemID.max()+1))
Xpp = Xrows.join(Xp).fillna(0)

Y = pd.read_parquet('./users-parquet',engine='pyarrow')
Yp = np.stack(Y.flatList)

Z = np.matmul(Yp, Xpp)
Zp = pd.DataFrame(Z)
Zp.columns = list(map(str, Zp.columns))

Zpp = pd.DataFrame()
Zpp['id'] = Zp.index
Zpp['ratings'] = Zp.values.tolist()

Zpp.to_parquet("sampleout.parquet",engine='pyarrow')

 

Scala -

import sys.process._
 val result = "python matmul.py".!
 val pythonOutput = 
userDataFrame.sparkSession.read.parquet("./sampleout.parquet")

 

I can provide code, and the data to repo. But could use some instructions how 
to set that up. This is based on the MovieLens 20mil dataset, or I can provide 
access to my data in Azure. 

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to