After following a tutorial on Recommender systems using Pyspark / Spark ML.  I 
decided to jump in with my own dataset.  I am specifically trying to predict 
video suggestions based on an implicit feature for the time a video was 
watched.  I wrote a generator to produce my dataset.  I have a total of five 
videos each 1200 seconds in length.  I randomly selected which videos a user 
watched and a random time between 0-1200.  I generated 10k records.  Weight is 
the time watched feature.  It looks a like this.

UserId,VideoId,Weight
0,1,645
0,2,870
0,3,1075
0,4,486
0,5,900
1,1,353
1,2,988
1,3,152
1,4,953
1,5,641
2,3,12
2,4,444
2,5,87
3,2,658
3,4,270
3,5,530
4,2,722
4,3,255
:

After reading the dataset.  I convert all columns to Integer in place.  
Describing Weight produces:

   summary  Weight
0 count       30136
1 mean       597.717945314574
2 stddev     346.475684454489
3 min          0
4 max         1200

Next, I standardized the weight column by:

df = dataset.select(mean('Weight').alias('mean_weight'), 
stddev('Weight').alias('stddev_weight')).crossJoin(dataset).withColumn('weight_scaled',
 (col('Weight') - col('mean_weight')) / col('stddev_weight'))

df.toPandas().head() shows:

   mean_weight  stddev_weight  UserId  VideoId  Weight  weight_scaled
0  597.717945   346.475684        0            1             645        0.136466
1  597.717945   346.475684        0            2             870        0.785862
2  597.717945   346.475684        0            3            1075       1.377534
3  597.717945   346.475684        0            4            486         
-0.322441
4  597.717945   346.475684        0            5            900         0.872448
:
10 597.717945   346.475684       2           3             12          -1.690502
11 597.717945   346.475684       2           4             444        -0.443662
12 597.717945   346.475684       2           5             87           
-1.474037
:

After splitting df 80 / 20 to get training / testing

I defined the ALS algo with:

als = ALS(maxIter=10, regParam=0.1, userCol='UserId', itemCol='VideoId', 
implicitPrefs=True, ratingCol='weight_scaled', coldStartStrategy='drop')

and then

model = als.fit(trainingData)

Calling fit() is where I get the following error, I don't understand.

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-12-20463d9db24b> in <module>
----> 1 model = als.fit(trainingData)

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, 
dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a 
list/tuple of param maps, "

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in 
_fit(self, dataset)
    286
    287     def _fit(self, dataset):
--> 288         java_model = self._fit_java(dataset)
    289         model = self._create_model(java_model)
    290         return self._copyValues(model)

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in 
_fit_java(self, dataset)
    283         """
    284         self._transfer_params_to_java()
--> 285         return self._java_obj.fit(dataset._jdf)
    286
    287     def _fit(self, dataset):

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py
 in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161
   1162         for temp_arg in temp_args:

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in 
deco(*a, **kw)
    61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py
 in get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling o211.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in 
stage 61.0 failed 1 times, most recent failure: Lost task 5.0 in stage 61.0 
(TID 179, localhost, executor driver): 
org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 
because A is not positive definite. Is A derived from a singular matrix (e.g. 
collinear column values)?
                at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
                at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
                at 
org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
                at 
org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699)
                at 
org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660)
                at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
                at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
                at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
                at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
                at 
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
                at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
                at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
                at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
                at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
                at org.apache.spark.scheduler.Task.run(Task.scala:109)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
                at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
                at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
                at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
                at scala.Option.foreach(Option.scala:257)
                at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
                at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
                at 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
                at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
                at 
org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118)
                at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
                at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
                at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
                at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111)
                at 
org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1711)
                at 
org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1652)
                at 
org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:972)
                at 
org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:969)
                at scala.collection.immutable.Range.foreach(Range.scala:160)
                at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:969)
                at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at py4j.Gateway.invoke(Gateway.java:282)
                at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:214)
                at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv 
returned 6 because A is not positive definite. Is A derived from a singular 
matrix (e.g. collinear column values)?
                at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
                at 
org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
                at 
org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
                at 
org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699)
                at 
org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660)
                at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
                at 
org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
                at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
                at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
                at 
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
                at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
                at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
                at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
                at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
                at org.apache.spark.scheduler.Task.run(Task.scala:109)
                at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
                at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                ... 1 more

Thanks in advance.

-S

Reply via email to