[ https://issues.apache.org/jira/browse/SPARK-3990?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiangrui Meng closed SPARK-3990. -------------------------------- Resolution: Fixed Fix Version/s: 1.2.0 1.1.1 This is fixed by reverting the default SerDe for PySpark in SPARK-2652. > kryo.KryoException caused by ALS.trainImplicit in pyspark > --------------------------------------------------------- > > Key: SPARK-3990 > URL: https://issues.apache.org/jira/browse/SPARK-3990 > Project: Spark > Issue Type: Bug > Components: MLlib, PySpark > Affects Versions: 1.1.0 > Environment: 5 slaves cluster(m3.large) in AWS launched by spark-ec2 > Linux > Python 2.6.8 > Reporter: Gen TANG > Assignee: Xiangrui Meng > Priority: Critical > Labels: test > Fix For: 1.1.1, 1.2.0 > > > When we tried ALS.trainImplicit() in pyspark environment, it only works for > iterations = 1. What is more strange, it is that if we try the same code in > Scala, it works very well.(I did several test, by now, in Scala > ALS.trainImplicit works) > For example, the following code: > {code:title=test.py|borderStyle=solid} > from pyspark.mllib.recommendation import * > r1 = (1, 1, 1.0) > r2 = (1, 2, 2.0) > r3 = (2, 1, 2.0) > ratings = sc.parallelize([r1, r2, r3]) > model = ALS.trainImplicit(ratings, 1) > '''by default iterations = 5 or model = ALS.trainImplicit(ratings, 1, 2)''' > {code} > It will cause the failed stage at count at ALS.scala:314 Info as: > {code:title=error information provided by ganglia} > Job aborted due to stage failure: Task 6 in stage 90.0 failed 4 times, most > recent failure: Lost task 6.3 in stage 90.0 (TID 484, > ip-172-31-35-238.ec2.internal): com.esotericsoftware.kryo.KryoException: > java.lang.ArrayStoreException: scala.collection.mutable.HashSet > Serialization trace: > shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) > > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) > com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) > com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) > > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) > > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > org.apache.spark.rdd.RDD.iterator(RDD.scala:227) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:745) > Driver stacktrace: > {code} > In the log of slave which failed the task, it has: > {code:title=error information in the log of slave} > 14/10/17 13:20:54 ERROR executor.Executor: Exception in task 6.0 in stage > 90.0 (TID 465) > com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: > scala.collection.mutable.HashSet > Serialization trace: > shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43) > at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at > org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:227) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > at org.apache.spark.scheduler.Task.run(Task.scala:54) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ArrayStoreException: scala.collection.mutable.HashSet > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > ... 36 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org