Hi Sean!

So the "coalesce" without shuffle will create a CoalescedRDD which during
its computation delegates to the parent RDD partitions.
As the CoalescedRDD contains only 1 partition so we talk about 1 task and 1
task context.

The next stop is PythonRunner.

Here the python workers at least are reused (when
"spark.python.worker.reuse" is true, and true is the default) but the
MonitorThreads are not reused and what is worse all the MonitorThreads are
created for the same worker and same TaskContext.
This means the CoalescedRDD's 1 tasks should be completed to stop the first
monitor thread, relevant code:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L570

So this will lead to creating 70000 extra threads when 1 would be enough.

The jira is: https://issues.apache.org/jira/browse/SPARK-35009
The PR will next week maybe (I am a bit uncertain as I have many other
things to do right now).

Best Regards,
Attila

On Fri, Apr 9, 2021 at 5:54 PM Sean Owen <sro...@gmail.com> wrote:

> Yeah I figured it's not something fundamental to the task or Spark. The
> error is very odd, never seen that. Do you have a theory on what's going on
> there? I don't!
>
> On Fri, Apr 9, 2021 at 10:43 AM Attila Zsolt Piros <
> piros.attila.zs...@gmail.com> wrote:
>
>> Hi!
>>
>> I looked into the code and find a way to improve it.
>>
>> With the improvement your test runs just fine:
>>
>> Welcome to
>>       ____              __
>>      / __/__  ___ _____/ /__
>>     _\ \/ _ \/ _ `/ __/  '_/
>>    /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
>>       /_/
>>
>> Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
>> Spark context Web UI available at http://192.168.0.199:4040
>> Spark context available as 'sc' (master = local, app id =
>> local-1617982367872).
>> SparkSession available as 'spark'.
>>
>> In [1]:     import pyspark
>>
>> In [2]:
>> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>>
>> In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)
>>
>> In [4]:     rows=70000
>>
>> In [5]:     data=list(range(rows))
>>
>> In [6]:     rdd=sc.parallelize(data,rows)
>>
>> In [7]:     assert rdd.getNumPartitions()==rows
>>
>> In [8]:     rdd0=rdd.filter(lambda x:False)
>>
>> In [9]:     assert rdd0.getNumPartitions()==rows
>>
>> In [10]:     rdd00=rdd0.coalesce(1)
>>
>> In [11]:     data=rdd00.collect()
>> 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very
>> large siz
>> e (4729 KiB). The maximum recommended task size is 1000 KiB.
>>
>> In [12]:     assert data==[]
>>
>> In [13]:
>>
>>
>> I will create a jira and need to add some unittest before opening the PR.
>>
>> Best Regards,
>> Attila
>>
>>>

Reply via email to