Hi Sean! So the "coalesce" without shuffle will create a CoalescedRDD which during its computation delegates to the parent RDD partitions. As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1 task context.
The next stop is PythonRunner. Here the python workers at least are reused (when "spark.python.worker.reuse" is true, and true is the default) but the MonitorThreads are not reused and what is worse all the MonitorThreads are created for the same worker and same TaskContext. This means the CoalescedRDD's 1 tasks should be completed to stop the first monitor thread, relevant code: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570 So this will lead to creating 70000 extra threads when 1 would be enough. The jira is: https://issues.apache.org/jira/browse/SPARK-35009 The PR will next week maybe (I am a bit uncertain as I have many other things to do right now). Best Regards, Attila On Fri, Apr 9, 2021 at 5:54 PM Sean Owen <sro...@gmail.com> wrote: > Yeah I figured it's not something fundamental to the task or Spark. The > error is very odd, never seen that. Do you have a theory on what's going on > there? I don't! > > On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros < > piros.attila.zs...@gmail.com> wrote: > >> Hi! >> >> I looked into the code and find a way to improve it. >> >> With the improvement your test runs just fine: >> >> Welcome to >> ____ __ >> / __/__ ___ _____/ /__ >> _\ \/ _ \/ _ `/ __/ '_/ >> /__ / .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT >> /_/ >> >> Using Python version 3.8.1 (default, Dec 30 2020 22:53:18) >> Spark context Web UI available at http://192.168.0.199:4040 >> Spark context available as 'sc' (master = local, app id = >> local-1617982367872). >> SparkSession available as 'spark'. >> >> In [1]: import pyspark >> >> In [2]: >> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1") >> >> In [3]: sc=pyspark.SparkContext.getOrCreate(conf) >> >> In [4]: rows=70000 >> >> In [5]: data=list(range(rows)) >> >> In [6]: rdd=sc.parallelize(data,rows) >> >> In [7]: assert rdd.getNumPartitions()==rows >> >> In [8]: rdd0=rdd.filter(lambda x:False) >> >> In [9]: assert rdd0.getNumPartitions()==rows >> >> In [10]: rdd00=rdd0.coalesce(1) >> >> In [11]: data=rdd00.collect() >> 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very >> large siz >> e (4729 KiB). The maximum recommended task size is 1000 KiB. >> >> In [12]: assert data==[] >> >> In [13]: >> >> >> I will create a jira and need to add some unittest before opening the PR. >> >> Best Regards, >> Attila >> >>>