Could you post the code that have problem with pyspark? thanks! Davies
On Thu, Oct 16, 2014 at 12:27 PM, Gen <gen.tan...@gmail.com> wrote: > I tried the same data with scala. It works pretty well. > It seems that it is the problem of pyspark. > In the console, it shows the following logs: > > Traceback (most recent call last): > File "<stdin>", line 1, in <module> > * File "/root/spark/python/pyspark/mllib/recommendation.py", line 76, in > trainImplicit > 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 4.3 in stage > 975.0 (TID 1653, ip-172-31-35-240.ec2.internal): TaskKilled (killed > intentionally) > ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)* > File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", > line 538, in __call__ > File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line > 300, in get_return_value > py4j.protocol.Py4JJavaError14/10/16 19:22:44 WARN scheduler.TaskSetManager: > Lost task 8.2 in stage 975.0 (TID 1650, ip-172-31-35-241.ec2.internal): > TaskKilled (killed intentionally) > : An error occurred while calling o32.trainImplicitALSModel. > : org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 > in stage 975.0 failed 4 times, most recent failure: Lost task 6.3 in stage > 975.0 (TID 1651, ip-172-31-35-237.ec2.internal): > com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: > scala.collection.mutable.HashSet > Serialization trace: > shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) > > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > org.apache.spark.rdd.RDD.iterator(RDD.scala:227) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>> 14/10/16 19:22:44 WARN scheduler.TaskSetManager: Lost task 18.2 in stage >>>> 975.0 (TID 1652, ip-172-31-35-241.ec2.internal): TaskKilled (killed >>>> intentionally) > 14/10/16 19:22:44 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 975.0, > whose tasks have all completed, from pool > > > > > Gen wrote >> Hi, >> >> I am trying to use ALS.trainImplicit method in the >> pyspark.mllib.recommendation. However it didn't work. So I tried use the >> example in the python API documentation such as: > / >> r1 = (1, 1, 1.0) >> r2 = (1, 2, 2.0) >> r3 = (2, 1, 2.0) >> ratings = sc.parallelize([r1, r2, r3]) >> model = ALS.trainImplicit(ratings, 1) > / >> >> It didn't work neither. After searching in google, I found that there are >> only two overloads for ALS.trainImplicit in the scala script. So I tried > / >> model = ALS.trainImplicit(ratings, 1, 1) > / >> , it worked. But if I set the iterations other than 1, > / >> model = ALS.trainImplicit(ratings, 1, 2) > / >> or > / >> model = ALS.trainImplicit(ratings, 4, 2) > / >> for example, it generated error. The information is as follows: >> >> count at ALS.scala:314 >> >> Job aborted due to stage failure: Task 6 in stage 189.0 failed 4 times, >> most recent failure: Lost task 6.3 in stage 189.0 (TID 626, >> ip-172-31-35-239.ec2.internal): com.esotericsoftware.kryo.KryoException: >> java.lang.ArrayStoreException: scala.collection.mutable.HashSet >> Serialization trace: >> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) >> >> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) >> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) >> >> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) >> >> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) >> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) >> >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) >> >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >> >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) >> >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) >> org.apache.spark.rdd.RDD.iterator(RDD.scala:227) >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) >> org.apache.spark.scheduler.Task.run(Task.scala:54) >> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) >> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> java.lang.Thread.run(Thread.java:745) >> Driver stacktrace: >> >> It is really strange, because count at ALS.scala:314 is already out the >> loop of iterations. Any idea? >> Thanks a lot for advance. >> >> FYI: I used spark 1.1.0 and ALS.train() works pretty well for all the >> cases. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/ALS-implicit-error-pyspark-tp16595p16607.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org