[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-16 Thread Liang-Chi Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930827#comment-16930827
 ] 

Liang-Chi Hsieh commented on SPARK-28927:
-

Regarding to AUC unstable issue, the nondeterministic training data, if not 
causing ArrayIndexOutOfBoundsException, can also cause wrong matching in 
computing factors. I don't have evidence that it is the reason. But it is 
possible.

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Attachments: image-2019-09-02-11-55-33-596.png
>
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
>  Here is the our code:
> {code:java}
> val hivedata = sc.sql(sqltext).select("id", "dpid", "score", "tag")
> .repartitio

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-16 Thread Liang-Chi Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930658#comment-16930658
 ] 

Liang-Chi Hsieh commented on SPARK-28927:
-

Because you are using 2.2.1, spark.sql.execution.sortBeforeRepartition is 
introduced to fix shuffle + repartition issue on Dataframe. The 
nondeterministic behavior can be triggered when a repartition call following a 
shuffle. I noticed that you have repartition after you read data using a sql. 
Maybe your sqltext has a shuffle operation.

You can try to checkpoint your training data, before fitting ALS model, to make 
the data deterministic.

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Attachments: image-2019-09-02-11-55-33-596.png
>
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was n

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-16 Thread Qiang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930314#comment-16930314
 ] 

Qiang Wang commented on SPARK-28927:


 As the code above,  I cannot see any non-deterministic operations.

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Assignee: Liang-Chi Hsieh
>Priority: Major
> Attachments: image-2019-09-02-11-55-33-596.png
>
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
>  Here is the our code:
> {code:java}
> val hivedata = sc.sql(sqltext).select("id", "dpid", "score", "tag")
> .repartition(6000).persist(StorageLevel.MEMORY_AND_DISK_SER)
> val zeroValueArrItem = ArrayBuffer[(String, Int)]()
> val predataItem = hivedata.
>   map(r => (r.getString(0), (r.getString(1

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-10 Thread Liang-Chi Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16926814#comment-16926814
 ] 

Liang-Chi Hsieh commented on SPARK-28927:
-

Hi [~JerryHouse], do you use any non-deterministic operations when preparing 
your training dataset, like sample, filtering based on random number, etc.?

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Priority: Major
> Attachments: image-2019-09-02-11-55-33-596.png
>
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
> Here is the our code:
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, 
> y._2.toFloat)))
>   .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class 
> ALSData(user:Int, item:Int, rating:Float) extends 

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-05 Thread zhengruifeng (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923163#comment-16923163
 ] 

zhengruifeng commented on SPARK-28927:
--

[~JerryHouse]  As to AUC, which impl do you use? BinaryClassificationEvaluator 
or BinaryClassificationMetrics?

If you use BinaryClassificationMetrics, you may try to set numBins=0 to avoid 
down-sampling, then we can see whether the score is stable.

Moreover, could you please provide a (small) dataframe to reproduce?

 

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Priority: Major
> Attachments: image-2019-09-02-11-55-33-596.png
>
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
> Here is the our code:
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-01 Thread Qiang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920601#comment-16920601
 ] 

Qiang Wang commented on SPARK-28927:


I check the commit list since May 31, 2017 before which all the commits are 
included in 2.2.1  and find that there is  no relative commit about this 
problem.
!image-2019-09-02-11-55-33-596.png!

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Priority: Major
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
> Here is the our code:
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, 
> y._2.toFloat)))
>   .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class 
> ALSData(user:Int, item:Int, rating:Float) extends Serializable
> val ratingDa

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-01 Thread Qiang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920596#comment-16920596
 ] 

Qiang Wang commented on SPARK-28927:


I only tested it on version 2.2.1 which is compatible to our spark version.  Is 
it ok to use mlib of the master brach while keeping the spark in the version 
2.2.1?

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Priority: Major
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
> Here is the our code:
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, 
> y._2.toFloat)))
>   .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class 
> ALSData(user:Int, item:Int, rating:Float) extends Serializable
> val ratingData = trainData.map(x => ALSDat

[jira] [Commented] (SPARK-28927) ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets with 12 billion instances

2019-09-01 Thread Liang-Chi Hsieh (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-28927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920480#comment-16920480
 ] 

Liang-Chi Hsieh commented on SPARK-28927:
-

Does this only happen on 2.2.1? How about current master branch?

> ArrayIndexOutOfBoundsException and Not-stable AUC metrics in ALS for datasets 
> with 12 billion instances
> ---
>
> Key: SPARK-28927
> URL: https://issues.apache.org/jira/browse/SPARK-28927
> Project: Spark
>  Issue Type: Bug
>  Components: ML
>Affects Versions: 2.2.1
>Reporter: Qiang Wang
>Priority: Major
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 
> BlockManager: Block rdd_10916_493 could not be removed as it was not found on 
> disk or in memory 19/08/28 07:00:41 ERROR Executor task launch worker for 
> task 325074 Executor: Exception in task 3.0 in stage 347.1 (TID 325074) 
> java.lang.ArrayIndexOutOfBoundsException: 6741 at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
>  at 
> org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at 
> org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at 
> org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032) 
> at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) 
> at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
>  at 
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:381) at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at 
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at 
> org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at 
> org.apache.spark.scheduler.Task.run(Task.scala:108) at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358) at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes.  And we also found that the AUC metric was 
> not stable when evaluating the inner product of the user factors and the item 
> factors with the same dataset and configuration. AUC varied from 0.60 to 0.67 
> which was not stable for production environment. 
> Dataset capacity: ~12 billion ratings
> Here is the our code:
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, 
> y._2.toFloat)))
>   .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)case class 
> ALSData(user:Int, item:Int, rating:Float) extends Serializable
> val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
> val als = new ALS
> val paramMap = ParamMap(als.alpha