Hi, We're encountering an issue with training an LDA model in PySpark. The issue is as follows:
- Running LDA on some large set of documents (12M, ~2-5kB each) - Works fine for small subset of full set (100K - 1M) - Hit a NullPointerException for full data set - Running workload on google cloud dataproc The following two issues I was able to find online appear relevant: https://issues.apache.org/jira/browse/SPARK-299 (which may not have been addressed as far as I can tell) http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-LDA-throws- NullPointerException-td26686.html Also I've heavily followed the code outlined here: http://sean.lane.sh/blog/2016/PySpark_and_LDA Any ideas or help is appreciated!! Thanks in advance, Kevin Example trace of output: 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 8.0 in >> stage 42.0 (TID 16163, >> royallda-20180209-152710-w-1.c.fathom-containers.internal, >> executor 4): java.lang.NullPointerException > > at org.apache.spark.mllib.clustering.LDA$.computePTopic(LDA.scala:432) > > at org.apache.spark.mllib.clustering.EMLDAOptimizer$$ >> anonfun$5.apply(LDAOptimizer.scala:190) > > at org.apache.spark.mllib.clustering.EMLDAOptimizer$$ >> anonfun$5.apply(LDAOptimizer.scala:184) > > at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan( >> EdgePartition.scala:409) > > at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$ >> anonfun$apply$3.apply(GraphImpl.scala:237) > > at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$ >> anonfun$apply$3.apply(GraphImpl.scala:207) > > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > > at org.apache.spark.util.collection.ExternalSorter. >> insertAll(ExternalSorter.scala:199) > > at org.apache.spark.shuffle.sort.SortShuffleWriter.write( >> SortShuffleWriter.scala:63) > > at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:96) > > at org.apache.spark.scheduler.ShuffleMapTask.runTask( >> ShuffleMapTask.scala:53) > > at org.apache.spark.scheduler.Task.run(Task.scala:108) > > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) > > at java.util.concurrent.ThreadPoolExecutor.runWorker( >> ThreadPoolExecutor.java:1142) > > at java.util.concurrent.ThreadPoolExecutor$Worker.run( >> ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:748) > > >> >> [Stage 42:> (0 + 24) / 1000][Stage 43:=> (104 + 0) >> / 1000]18/02/09 16:22:55 ERROR org.apache.spark.scheduler.TaskSetManager: >> Task 8 in stage 42.0 failed 4 times; aborting job > > Traceback (most recent call last): > > File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py", >> line 258, in <module> > > fire.Fire({'run_pipeline': run_pipeline}) > > File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 120, >> in Fire > > component_trace = _Fire(component, args, context, name) > > File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 358, >> in _Fire > > component, remaining_args) > > File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 561, >> in _CallCallable > > result = fn(*varargs, **kwargs) > > File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py", >> line 77, in run_pipeline > > run_pipeline_local(pipeline_id, **kwargs) > > File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py", >> line 94, in run_pipeline_local > > pipeline.run(**kwargs) > > File > "/tmp/61801514-d562-433b-ac42-faa758c27b63/diseaseTools.zip/spark/pipelines/royal_lda.py", >> line 142, in run > > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/clustering.py", >> line 1039, in train > > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", >> line 130, in callMLlibFunc > > File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py", >> line 123, in callJavaFunc > > File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", >> line 1133, in __call__ > > File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", >> line 319, in get_return_value > > py4j.protocol.Py4JJavaError: An error occurred while calling >> o161.trainLDAModel. > > : org.apache.spark.SparkException: Job aborted due to stage failure: Task >> 8 in stage 42.0 failed 4 times, most recent failure: Lost task 8.3 in stage >> 42.0 (TID 16221, royallda-20180209-152710-w-1.c.fathom-containers.internal, >> executor 4): java.lang.NullPointerException > > >> Driver stacktrace: > > at org.apache.spark.scheduler.DAGScheduler.org >> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$ >> scheduler$DAGScheduler$$failJobAndIndependentStages( >> DAGScheduler.scala:1499) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( >> DAGScheduler.scala:1487) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply( >> DAGScheduler.scala:1486) > > at scala.collection.mutable.ResizableArray$class.foreach( >> ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at org.apache.spark.scheduler.DAGScheduler.abortStage( >> DAGScheduler.scala:1486) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ >> handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > > at org.apache.spark.scheduler.DAGScheduler$$anonfun$ >> handleTaskSetFailed$1.apply(DAGScheduler.scala:814) > > at scala.Option.foreach(Option.scala:257) > > at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed( >> DAGScheduler.scala:814) > > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. >> doOnReceive(DAGScheduler.scala:1714) > > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. >> onReceive(DAGScheduler.scala:1669) > > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop. >> onReceive(DAGScheduler.scala:1658) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) > > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119) > > at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1089) > > at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:151) > > at org.apache.spark.rdd.RDDOperationScope$.withScope( >> RDDOperationScope.scala:112) > > at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) > > at org.apache.spark.rdd.RDD.fold(RDD.scala:1083) > > at org.apache.spark.mllib.clustering.EMLDAOptimizer. >> computeGlobalTopicTotals(LDAOptimizer.scala:229) > > at org.apache.spark.mllib.clustering.EMLDAOptimizer. >> next(LDAOptimizer.scala:216) > > at org.apache.spark.mllib.clustering.EMLDAOptimizer. >> next(LDAOptimizer.scala:80) > > at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:336) > > at org.apache.spark.mllib.api.python.PythonMLLibAPI. >> trainLDAModel(PythonMLLibAPI.scala:552) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at sun.reflect.NativeMethodAccessorImpl.invoke( >> NativeMethodAccessorImpl.java:62) > > at sun.reflect.DelegatingMethodAccessorImpl.invoke( >> DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > > at py4j.Gateway.invoke(Gateway.java:280) > > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > > at py4j.commands.CallCommand.execute(CallCommand.java:79) > > at py4j.GatewayConnection.run(GatewayConnection.java:214) > > at java.lang.Thread.run(Thread.java:748) > > Caused by: java.lang.NullPointerException > > >> Waiting up to 5 seconds. > > Sent all pending logs. > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 36.3 in stage 42.0 (TID 16224, >> royallda-20180209-152710-w-2.c.fathom-containers.internal, >> executor 5): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 14.1 in stage 42.0 (TID 16228, >> royallda-20180209-152710-w-5.c.fathom-containers.internal, >> executor 3): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 19.2 in stage 42.0 (TID 16231, >> royallda-20180209-152710-w-1.c.fathom-containers.internal, >> executor 4): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 4.2 in stage 42.0 (TID 16219, >> royallda-20180209-152710-w-1.c.fathom-containers.internal, >> executor 4): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 18.3 in stage 42.0 (TID 16229, >> royallda-20180209-152710-w-2.c.fathom-containers.internal, >> executor 5): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 10.3 in stage 42.0 (TID 16223, >> royallda-20180209-152710-w-1.c.fathom-containers.internal, >> executor 4): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 22.2 in stage 42.0 (TID 16222, >> royallda-20180209-152710-w-2.c.fathom-containers.internal, >> executor 5): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 0.1 in stage 42.0 (TID 16230, >> royallda-20180209-152710-w-3.c.fathom-containers.internal, >> executor 2): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 12.1 in stage 42.0 (TID 16226, >> royallda-20180209-152710-w-3.c.fathom-containers.internal, >> executor 2): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 38.3 in stage 42.0 (TID 16227, >> royallda-20180209-152710-w-4.c.fathom-containers.internal, >> executor 1): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 43.2 in stage 42.0 (TID 16225, >> royallda-20180209-152710-w-4.c.fathom-containers.internal, >> executor 1): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 46.1 in stage 42.0 (TID 16188, >> royallda-20180209-152710-w-0.c.fathom-containers.internal, >> executor 6): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 9.1 in stage 42.0 (TID 16214, >> royallda-20180209-152710-w-0.c.fathom-containers.internal, >> executor 6): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 53.1 in stage 42.0 (TID 16192, >> royallda-20180209-152710-w-0.c.fathom-containers.internal, >> executor 6): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.ExecutorAllocationManager: No >> stages are running, but numRunningTasks != 0 > > 18/02/09 16:22:55 WARN org.apache.spark.ui.jobs.JobProgressListener: Task >> start for unknown stage 43 > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 20.0 in stage 42.0 (TID 16205, >> royallda-20180209-152710-w-3.c.fathom-containers.internal, >> executor 2): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 35.1 in stage 42.0 (TID 16211, >> royallda-20180209-152710-w-2.c.fathom-containers.internal, >> executor 5): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 1.1 in stage 42.0 (TID 16209, >> royallda-20180209-152710-w-3.c.fathom-containers.internal, >> executor 2): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 16.1 in stage 42.0 (TID 16213, >> royallda-20180209-152710-w-1.c.fathom-containers.internal, >> executor 4): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 25.1 in stage 42.0 (TID 16204, >> royallda-20180209-152710-w-4.c.fathom-containers.internal, >> executor 1): TaskKilled (stage cancelled) > > 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 5.1 in stage 42.0 (TID 16208, >> royallda-20180209-152710-w-5.c.fathom-containers.internal, >> executor 3): TaskKilled (stage cancelled) > > 18/02/09 16:22:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 14.0 in stage 43.0 (TID 16232, >> royallda-20180209-152710-w-5.c.fathom-containers.internal, >> executor 3): TaskKilled (stage cancelled) > > 18/02/09 16:22:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 204.0 in stage 43.0 (TID 16233, >> royallda-20180209-152710-w-4.c.fathom-containers.internal, >> executor 1): TaskKilled (stage cancelled) > > 18/02/09 16:22:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost >> task 104.0 in stage 43.0 (TID 16234, >> royallda-20180209-152710-w-0.c.fathom-containers.internal, >> executor 6): TaskKilled (stage cancelled) > > 18/02/09 16:22:56 INFO org.spark_project.jetty.server.AbstractConnector: >> Stopped Spark@1cd655d5{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} > >