After following a tutorial on Recommender systems using Pyspark / Spark ML. I decided to jump in with my own dataset. I am specifically trying to predict video suggestions based on an implicit feature for the time a video was watched. I wrote a generator to produce my dataset. I have a total of five videos each 1200 seconds in length. I randomly selected which videos a user watched and a random time between 0-1200. I generated 10k records. Weight is the time watched feature. It looks a like this.
UserId,VideoId,Weight 0,1,645 0,2,870 0,3,1075 0,4,486 0,5,900 1,1,353 1,2,988 1,3,152 1,4,953 1,5,641 2,3,12 2,4,444 2,5,87 3,2,658 3,4,270 3,5,530 4,2,722 4,3,255 : After reading the dataset. I convert all columns to Integer in place. Describing Weight produces: summary Weight 0 count 30136 1 mean 597.717945314574 2 stddev 346.475684454489 3 min 0 4 max 1200 Next, I standardized the weight column by: df = dataset.select(mean('Weight').alias('mean_weight'), stddev('Weight').alias('stddev_weight')).crossJoin(dataset).withColumn('weight_scaled', (col('Weight') - col('mean_weight')) / col('stddev_weight')) df.toPandas().head() shows: mean_weight stddev_weight UserId VideoId Weight weight_scaled 0 597.717945 346.475684 0 1 645 0.136466 1 597.717945 346.475684 0 2 870 0.785862 2 597.717945 346.475684 0 3 1075 1.377534 3 597.717945 346.475684 0 4 486 -0.322441 4 597.717945 346.475684 0 5 900 0.872448 : 10 597.717945 346.475684 2 3 12 -1.690502 11 597.717945 346.475684 2 4 444 -0.443662 12 597.717945 346.475684 2 5 87 -1.474037 : After splitting df 80 / 20 to get training / testing I defined the ALS algo with: als = ALS(maxIter=10, regParam=0.1, userCol='UserId', itemCol='VideoId', implicitPrefs=True, ratingCol='weight_scaled', coldStartStrategy='drop') and then model = als.fit(trainingData) Calling fit() is where I get the following error, I don't understand. Py4JJavaError Traceback (most recent call last) <ipython-input-12-20463d9db24b> in <module> ----> 1 model = als.fit(trainingData) C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params) 130 return self.copy(params)._fit(dataset) 131 else: --> 132 return self._fit(dataset) 133 else: 134 raise ValueError("Params must be either a param map or a list/tuple of param maps, " C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset) 286 287 def _fit(self, dataset): --> 288 java_model = self._fit_java(dataset) 289 model = self._create_model(java_model) 290 return self._copyValues(model) C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset) 283 """ 284 self._transfer_params_to_java() --> 285 return self._java_obj.fit(dataset._jdf) 286 287 def _fit(self, dataset): C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py in __call__(self, *args) 1158 answer = self.gateway_client.send_command(command) 1159 return_value = get_return_value( -> 1160 answer, self.gateway_client, self.target_id, self.name) 1161 1162 for temp_arg in temp_args: C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw) 61 def deco(*a, **kw): 62 try: ---> 63 return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e: 65 s = e.java_exception.toString() C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in get_return_value(answer, gateway_client, target_id, name) 318 raise Py4JJavaError( 319 "An error occurred while calling {0}{1}{2}.\n". --> 320 format(target_id, ".", name), value) 321 else: 322 raise Py4JError( Py4JJavaError: An error occurred while calling o211.fit. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 61.0 failed 1 times, most recent failure: Lost task 5.0 in stage 61.0 (TID 179, localhost, executor driver): org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124) at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111) at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1711) at org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1652) at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:972) at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:969) at scala.collection.immutable.Range.foreach(Range.scala:160) at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:969) at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)? at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65) at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41) at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699) at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335) at org.apache.spark.rdd.RDD.iterator(RDD.scala:286) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Thanks in advance. -S