Hi,

We're encountering an issue with training an LDA model in PySpark. The
issue is as follows:

- Running LDA on some large set of documents (12M, ~2-5kB each)
- Works fine for small subset of full set (100K - 1M)
- Hit a NullPointerException for full data set
- Running workload on google cloud dataproc

The following two issues I was able to find online appear relevant:
https://issues.apache.org/jira/browse/SPARK-299 (which may not have been
addressed as far as I can tell)
http://apache-spark-user-list.1001560.n3.nabble.com/MLLIB-LDA-throws-
NullPointerException-td26686.html

Also I've heavily followed the code outlined here:
http://sean.lane.sh/blog/2016/PySpark_and_LDA

Any ideas or help is appreciated!!

Thanks in advance,
Kevin

Example trace of output:

16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 8.0 in
>> stage 42.0 (TID 16163, 
>> royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): java.lang.NullPointerException
>
> at org.apache.spark.mllib.clustering.LDA$.computePTopic(LDA.scala:432)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer$$
>> anonfun$5.apply(LDAOptimizer.scala:190)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer$$
>> anonfun$5.apply(LDAOptimizer.scala:184)
>
> at org.apache.spark.graphx.impl.EdgePartition.aggregateMessagesEdgeScan(
>> EdgePartition.scala:409)
>
> at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$
>> anonfun$apply$3.apply(GraphImpl.scala:237)
>
> at org.apache.spark.graphx.impl.GraphImpl$$anonfun$13$$
>> anonfun$apply$3.apply(GraphImpl.scala:207)
>
> at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
>
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
>
> at org.apache.spark.util.collection.ExternalSorter.
>> insertAll(ExternalSorter.scala:199)
>
> at org.apache.spark.shuffle.sort.SortShuffleWriter.write(
>> SortShuffleWriter.scala:63)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:96)
>
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
>> ShuffleMapTask.scala:53)
>
> at org.apache.spark.scheduler.Task.run(Task.scala:108)
>
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
>
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
>> ThreadPoolExecutor.java:1142)
>
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
>> ThreadPoolExecutor.java:617)
>
> at java.lang.Thread.run(Thread.java:748)
>
>
>>
>> [Stage 42:>             (0 + 24) / 1000][Stage 43:=>           (104 + 0)
>> / 1000]18/02/09 16:22:55 ERROR org.apache.spark.scheduler.TaskSetManager:
>> Task 8 in stage 42.0 failed 4 times; aborting job
>
> Traceback (most recent call last):
>
>   File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py",
>> line 258, in <module>
>
>     fire.Fire({'run_pipeline': run_pipeline})
>
>   File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 120,
>> in Fire
>
>     component_trace = _Fire(component, args, context, name)
>
>   File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 358,
>> in _Fire
>
>     component, remaining_args)
>
>   File "/usr/local/lib/python3.6/dist-packages/fire/core.py", line 561,
>> in _CallCallable
>
>     result = fn(*varargs, **kwargs)
>
>   File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py",
>> line 77, in run_pipeline
>
>     run_pipeline_local(pipeline_id, **kwargs)
>
>   File "/tmp/61801514-d562-433b-ac42-faa758c27b63/pipeline_launcher.py",
>> line 94, in run_pipeline_local
>
>     pipeline.run(**kwargs)
>
>   File 
> "/tmp/61801514-d562-433b-ac42-faa758c27b63/diseaseTools.zip/spark/pipelines/royal_lda.py",
>> line 142, in run
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/clustering.py",
>> line 1039, in train
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 130, in callMLlibFunc
>
>   File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/mllib/common.py",
>> line 123, in callJavaFunc
>
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py",
>> line 1133, in __call__
>
>   File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py",
>> line 319, in get_return_value
>
> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o161.trainLDAModel.
>
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> 8 in stage 42.0 failed 4 times, most recent failure: Lost task 8.3 in stage
>> 42.0 (TID 16221, royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): java.lang.NullPointerException
>
>
>> Driver stacktrace:
>
> at org.apache.spark.scheduler.DAGScheduler.org
>> <http://org.apache.spark.scheduler.dagscheduler.org/>$apache$spark$
>> scheduler$DAGScheduler$$failJobAndIndependentStages(
>> DAGScheduler.scala:1499)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1487)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(
>> DAGScheduler.scala:1486)
>
> at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
>> DAGScheduler.scala:1486)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
>
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
>> handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
>
> at scala.Option.foreach(Option.scala:257)
>
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
>> DAGScheduler.scala:814)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> doOnReceive(DAGScheduler.scala:1714)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1669)
>
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
>> onReceive(DAGScheduler.scala:1658)
>
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
>
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
>
> at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1089)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:151)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:112)
>
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
>
> at org.apache.spark.rdd.RDD.fold(RDD.scala:1083)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer.
>> computeGlobalTopicTotals(LDAOptimizer.scala:229)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer.
>> next(LDAOptimizer.scala:216)
>
> at org.apache.spark.mllib.clustering.EMLDAOptimizer.
>> next(LDAOptimizer.scala:80)
>
> at org.apache.spark.mllib.clustering.LDA.run(LDA.scala:336)
>
> at org.apache.spark.mllib.api.python.PythonMLLibAPI.
>> trainLDAModel(PythonMLLibAPI.scala:552)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62)
>
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>
> at py4j.Gateway.invoke(Gateway.java:280)
>
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.NullPointerException
>
>
>> Waiting up to 5 seconds.
>
> Sent all pending logs.
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 36.3 in stage 42.0 (TID 16224, 
>> royallda-20180209-152710-w-2.c.fathom-containers.internal,
>> executor 5): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 14.1 in stage 42.0 (TID 16228, 
>> royallda-20180209-152710-w-5.c.fathom-containers.internal,
>> executor 3): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 19.2 in stage 42.0 (TID 16231, 
>> royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 4.2 in stage 42.0 (TID 16219, 
>> royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 18.3 in stage 42.0 (TID 16229, 
>> royallda-20180209-152710-w-2.c.fathom-containers.internal,
>> executor 5): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 10.3 in stage 42.0 (TID 16223, 
>> royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 22.2 in stage 42.0 (TID 16222, 
>> royallda-20180209-152710-w-2.c.fathom-containers.internal,
>> executor 5): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 0.1 in stage 42.0 (TID 16230, 
>> royallda-20180209-152710-w-3.c.fathom-containers.internal,
>> executor 2): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 12.1 in stage 42.0 (TID 16226, 
>> royallda-20180209-152710-w-3.c.fathom-containers.internal,
>> executor 2): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 38.3 in stage 42.0 (TID 16227, 
>> royallda-20180209-152710-w-4.c.fathom-containers.internal,
>> executor 1): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 43.2 in stage 42.0 (TID 16225, 
>> royallda-20180209-152710-w-4.c.fathom-containers.internal,
>> executor 1): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 46.1 in stage 42.0 (TID 16188, 
>> royallda-20180209-152710-w-0.c.fathom-containers.internal,
>> executor 6): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 9.1 in stage 42.0 (TID 16214, 
>> royallda-20180209-152710-w-0.c.fathom-containers.internal,
>> executor 6): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 53.1 in stage 42.0 (TID 16192, 
>> royallda-20180209-152710-w-0.c.fathom-containers.internal,
>> executor 6): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.ExecutorAllocationManager: No
>> stages are running, but numRunningTasks != 0
>
> 18/02/09 16:22:55 WARN org.apache.spark.ui.jobs.JobProgressListener: Task
>> start for unknown stage 43
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 20.0 in stage 42.0 (TID 16205, 
>> royallda-20180209-152710-w-3.c.fathom-containers.internal,
>> executor 2): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 35.1 in stage 42.0 (TID 16211, 
>> royallda-20180209-152710-w-2.c.fathom-containers.internal,
>> executor 5): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 1.1 in stage 42.0 (TID 16209, 
>> royallda-20180209-152710-w-3.c.fathom-containers.internal,
>> executor 2): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 16.1 in stage 42.0 (TID 16213, 
>> royallda-20180209-152710-w-1.c.fathom-containers.internal,
>> executor 4): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 25.1 in stage 42.0 (TID 16204, 
>> royallda-20180209-152710-w-4.c.fathom-containers.internal,
>> executor 1): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:55 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 5.1 in stage 42.0 (TID 16208, 
>> royallda-20180209-152710-w-5.c.fathom-containers.internal,
>> executor 3): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 14.0 in stage 43.0 (TID 16232, 
>> royallda-20180209-152710-w-5.c.fathom-containers.internal,
>> executor 3): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 204.0 in stage 43.0 (TID 16233, 
>> royallda-20180209-152710-w-4.c.fathom-containers.internal,
>> executor 1): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:56 WARN org.apache.spark.scheduler.TaskSetManager: Lost
>> task 104.0 in stage 43.0 (TID 16234, 
>> royallda-20180209-152710-w-0.c.fathom-containers.internal,
>> executor 6): TaskKilled (stage cancelled)
>
> 18/02/09 16:22:56 INFO org.spark_project.jetty.server.AbstractConnector:
>> Stopped Spark@1cd655d5{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
>
>

Reply via email to