attilapiros opened a new pull request #32169:
URL: https://github.com/apache/spark/pull/32169


   
   ### What changes were proposed in this pull request?
   
   With this PR Spark avoids creating multiple monitor threads for the same 
worker and same task context.
   
   ### Why are the changes needed?
   
   Without this change unnecessary threads will be created. It even can cause 
job failure for example when a coalesce (without shuffle) from high partition 
number goes to very low one. This exception is exactly comes for such a run:
   
   ```
   py4j.protocol.Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 
(TID 0) (192.168.1.210 executor driver): java.lang.OutOfMemoryError: unable to 
create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at 
org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at scala.collection.AbstractIterator.to(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2260)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2262)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2211)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2210)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2210)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1083)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1083)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1083)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2449)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2391)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2380)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:872)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2220)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2241)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2260)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2285)
        at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
        at 
org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
        at 
org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at 
org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:166)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
        at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
        at scala.collection.AbstractIterator.to(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1429)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1429)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2260)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more
   ```
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Manually I used a the following Python script used 
(`reproduce-SPARK-35009.py`):
   
   ```
   import pyspark
   
   conf = pyspark.SparkConf().setMaster("local[*]").setAppName("Test1")
   sc = pyspark.SparkContext.getOrCreate(conf)
   
   rows = 70000
   data = list(range(rows))
   rdd = sc.parallelize(data, rows)
   assert rdd.getNumPartitions() == rows
   rdd0 = rdd.filter(lambda x: False)
   data = rdd0.coalesce(1).collect()
   assert data == []
   ```
   
   Spark submit:
   ```
   $ ./bin/spark-submit reproduce-SPARK-35009.py
   ```
   
   #### With this change
   
   Checking the number of monitor threads with jcmd:
   ```
   $ jcmd
   85273 sun.tools.jcmd.JCmd
   85227 org.apache.spark.deploy.SparkSubmit reproduce-SPARK-35009.py
   41020 scala.tools.nsc.MainGenericRunner
   $ jcmd 85227 Thread.print | grep -c "Monitor for python"
   2
   $ jcmd 85227 Thread.print | grep -c "Monitor for python"
   2
   ...
   $ jcmd 85227 Thread.print | grep -c "Monitor for python"
   2
   $ jcmd 85227 Thread.print | grep -c "Monitor for python"
   2
   $ jcmd 85227 Thread.print | grep -c "Monitor for python"
   2
   $ jcmd 85227 Thread.print | grep -c "Monitor for python"
   2
   ```
   
   #### Without this change
   
   ```
   ...
   $ jcmd 90052 Thread.print | grep -c "Monitor for python"                     
                                                                                
 [INSERT]
   5645
   ..
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to