[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930827#comment-16930827 ] Liang-Chi Hsieh commented on SPARK-28927: - Regarding to AUC unstable issue, the nondeterministic training data, if not causing ArrayIndexOutOfBoundsException, can also cause wrong matching in computing factors. I don't have evidence that it is the reason. But it is possible. > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Assignee: Liang-Chi Hsieh >Priority: Major > Attachments: image-2019-09-02-11-55-33-596.png > > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > {code:java} > val hivedata = sc.sql(sqltext).select("id", "dpid", "score", "tag") > .repartitio
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930658#comment-16930658 ] Liang-Chi Hsieh commented on SPARK-28927: - Because you are using 2.2.1, spark.sql.execution.sortBeforeRepartition is introduced to fix shuffle + repartition issue on Dataframe. The nondeterministic behavior can be triggered when a repartition call following a shuffle. I noticed that you have repartition after you read data using a sql. Maybe your sqltext has a shuffle operation. You can try to checkpoint your training data, before fitting ALS model, to make the data deterministic. > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Assignee: Liang-Chi Hsieh >Priority: Major > Attachments: image-2019-09-02-11-55-33-596.png > > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was n
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930314#comment-16930314 ] Qiang Wang commented on SPARK-28927: As the code above, I cannot see any non-deterministic operations. > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Assignee: Liang-Chi Hsieh >Priority: Major > Attachments: image-2019-09-02-11-55-33-596.png > > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > {code:java} > val hivedata = sc.sql(sqltext).select("id", "dpid", "score", "tag") > .repartition(6000).persist(StorageLevel.MEMORY_AND_DISK_SER) > val zeroValueArrItem = ArrayBuffer[(String, Int)]() > val predataItem = hivedata. > map(r => (r.getString(0), (r.getString(1
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926814#comment-16926814 ] Liang-Chi Hsieh commented on SPARK-28927: - Hi [~JerryHouse], do you use any non-deterministic operations when preparing your training dataset, like sample, filtering based on random number, etc.? > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Priority: Major > Attachments: image-2019-09-02-11-55-33-596.png > > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, > y._2.toFloat))) > .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class > ALSData(user:Int, item:Int, rating:Float) extends
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923163#comment-16923163 ] zhengruifeng commented on SPARK-28927: -- [~JerryHouse] As to AUC, which impl do you use? BinaryClassificationEvaluator or BinaryClassificationMetrics? If you use BinaryClassificationMetrics, you may try to set numBins=0 to avoid down-sampling, then we can see whether the score is stable. Moreover, could you please provide a (small) dataframe to reproduce? > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Priority: Major > Attachments: image-2019-09-02-11-55-33-596.png > > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920601#comment-16920601 ] Qiang Wang commented on SPARK-28927: I check the commit list since May 31, 2017 before which all the commits are included in 2.2.1 and find that there is no relative commit about this problem. !image-2019-09-02-11-55-33-596.png! > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Priority: Major > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, > y._2.toFloat))) > .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class > ALSData(user:Int, item:Int, rating:Float) extends Serializable > val ratingDa
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920596#comment-16920596 ] Qiang Wang commented on SPARK-28927: I only tested it on version 2.2.1 which is compatible to our spark version. Is it ok to use mlib of the master brach while keeping the spark in the version 2.2.1? > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Priority: Major > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, > y._2.toFloat))) > .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class > ALSData(user:Int, item:Int, rating:Float) extends Serializable > val ratingData = trainData.map(x => ALSDat
[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances
[ https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920480#comment-16920480 ] Liang-Chi Hsieh commented on SPARK-28927: - Does this only happen on 2.2.1? How about current master branch? > ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets > with 12 billion instances > --- > > Key: SPARK-28927 > URL: https://issues.apache.org/jira/browse/SPARK-28927 > Project: Spark > Issue Type: Bug > Components: ML >Affects Versions: 2.2.1 >Reporter: Qiang Wang >Priority: Major > > The stack trace is below: > {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 > BlockManager: Block rdd_10916_493 could not be removed as it was not found on > disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for > task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) > java.lang.ArrayIndexOutOfBoundsException: 6741 at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460) > at > org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at > org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at > org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041) > at > org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032) > at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at > org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) > at > org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) > at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141) > at > org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733) > at scala.collection.immutable.List.foreach(List.scala:381) at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732) > at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at > org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at > org.apache.spark.scheduler.Task.run(Task.scala:108) at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {quote} > This exception happened sometimes. And we also found that the AUC metric was > not stable when evaluating the inner product of the user factors and the item > factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 > which was not stable for production environment. > Dataset capacity: ~12 billion ratings > Here is the our code: > val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, > y._2.toFloat))) > .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class > ALSData(user:Int, item:Int, rating:Float) extends Serializable > val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF() > val als = new ALS > val paramMap = ParamMap(als.alpha