Hi!

I looked into the code and find a way to improve it.

With the improvement your test runs just fine:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.2.0-SNAPSHOT
      /_/

Using Python version 3.8.1 (default, Dec 30 2020 22:53:18)
Spark context Web UI available at http://192.168.0.199:4040
Spark context available as 'sc' (master = local, app id =
local-1617982367872).
SparkSession available as 'spark'.

In [1]:     import pyspark

In [2]:
conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")

In [3]:     sc=pyspark.SparkContext.getOrCreate(conf)

In [4]:     rows=70000

In [5]:     data=list(range(rows))

In [6]:     rdd=sc.parallelize(data,rows)

In [7]:     assert rdd.getNumPartitions()==rows

In [8]:     rdd0=rdd.filter(lambda x:False)

In [9]:     assert rdd0.getNumPartitions()==rows

In [10]:     rdd00=rdd0.coalesce(1)

In [11]:     data=rdd00.collect()
21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very
large siz
e (4729 KiB). The maximum recommended task size is 1000 KiB.

In [12]:     assert data==[]

In [13]:


I will create a jira and need to add some unittest before opening the PR.

Best Regards,
Attila

On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD <
markus.wei...@bertelsmann.de> wrote:

> I’ve changed the code to set driver memory to 100g, changed python code:
>
>     import pyspark
>
>
> conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory",
> value="100g")
>
>     sc=pyspark.SparkContext.getOrCreate(conf)
>
>     rows=70000
>
>     data=list(range(rows))
>
>     rdd=sc.parallelize(data,rows)
>
>     assert rdd.getNumPartitions()==rows
>
>     rdd0=rdd.filter(lambda x:False)
>
>     assert rdd0.getNumPartitions()==rows
>
>     rdd00=rdd0.coalesce(1)
>
>     data=rdd00.collect()
>
> assert data==[]
>
>
>
> Still the same error happens:
>
>
>
> 21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very
> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough
> space' (errno=12)
>
> [423.701s][warning][os,thread] Attempt to protect stack guard pages failed
> (0x00007f4640d28000-0x00007f4640d2c000).
>
> [423.701s][warning][os,thread] Attempt to deallocate stack guard pages
> failed.
>
> [423.704s][warning][os,thread] Failed to start thread - pthread_create
> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>
> #
>
> # There is insufficient memory for the Java Runtime Environment to
> continue.
>
> # Native memory allocation (mmap) failed to map 16384 bytes for committing
> reserved memory.
>
>
>
> A function which needs 423 seconds to crash with excessive memory
> consumption when trying to coalesce 70000 empty partitions is not very
> practical. As I do not know the limits in which coalesce without shuffling
> can be used safely and with performance, I will now always use coalesce
> with shuffling, even though in theory this will come with quite a
> performance decrease.
>
>
>
> Markus
>
>
>
> *Von:* Russell Spitzer <russell.spit...@gmail.com>
> *Gesendet:* Donnerstag, 8. April 2021 15:24
> *An:* Weiand, Markus, NMA-CFD <markus.wei...@bertelsmann.de>
> *Cc:* user@spark.apache.org
> *Betreff:* Re: possible bug
>
>
>
> Could be that the driver JVM cannot handle the metadata required to store
> the partition information of a 70k partition RDD. I see you say you have a
> 100GB driver but i'm not sure where you configured that?
>
> Did you set --driver-memory 100G ?
>
>
>
> On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD <
> markus.wei...@bertelsmann.de> wrote:
>
> This is the reduction of an error in a complex program where allocated 100
> GB driver (=worker=executor as local mode) memory. In the example I used
> the default size, as the puny example shouldn’t need more anyway.
>
> And without the coalesce or with coalesce(1,True) everything works fine.
>
> I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd
> with 1 partition, why is this a problem without shuffling?
>
>
>
> *Von:* Sean Owen <sro...@gmail.com>
> *Gesendet:* Donnerstag, 8. April 2021 15:00
> *An:* Weiand, Markus, NMA-CFD <markus.wei...@bertelsmann.de>
> *Cc:* user@spark.apache.org
> *Betreff:* Re: possible bug
>
>
>
> That's a very low level error from the JVM. Any chance you are
> misconfiguring the executor size? like to 10MB instead of 10GB, that kind
> of thing. Trying to think of why the JVM would have very little memory to
> operate.
>
> An app running out of mem would not look like this.
>
>
>
> On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD <
> markus.wei...@bertelsmann.de> wrote:
>
> Hi all,
>
>
>
> I'm using spark on a c5a.16xlarge machine in amazon cloud (so having  64
> cores and 128 GB RAM). I'm using spark 3.01.
>
>
>
> The following python code leads to an exception, is this a bug or is my
> understanding of the API incorrect?
>
>
>
>     import pyspark
>
>     conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1")
>
>     sc=pyspark.SparkContext.getOrCreate(conf)
>
>     rows=70000
>
>     data=list(range(rows))
>
>     rdd=sc.parallelize(data,rows)
>
>     assert rdd.getNumPartitions()==rows
>
>     rdd0=rdd.filter(lambda x:False)
>
>     assert rdd0.getNumPartitions()==rows
>
>     rdd00=rdd0.coalesce(1)
>
>     data=rdd00.collect()
>
>     assert data==[]
>
>
>
> output when starting from PyCharm:
>
>
>
> /home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python
> /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py
> --mode=client --port=41185
>
> import sys; print('Python %s on %s' % (sys.version, sys.platform))
>
> sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>'])
>
> PyDev console: starting.
>
> Python 3.8.5 (default, Jan 27 2021, 15:41:15)
>
> [GCC 9.3.0] on linux
>
> import os
>
> os.environ['PYTHONHASHSEED'] = '0'
>
> runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py',
> wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests')
>
> WARNING: An illegal reflective access operation has occurred
>
> WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform
> (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor
> java.nio.DirectByteBuffer(long,int)
>
> WARNING: Please consider reporting this to the maintainers of
> org.apache.spark.unsafe.Platform
>
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
>
> WARNING: All illegal access operations will be denied in a future release
>
> 21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> 21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very
> large size (4732 KiB). The maximum recommended task size is 1000 KiB.
>
> [Stage 0:>                                                          (0 +
> 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages
> failed (0x00007f43d23ff000-0x00007f43d2403000).
>
> [423.190s][warning][os,thread] Attempt to deallocate stack guard pages
> failed.
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough
> space' (errno=12)
>
> [423.231s][warning][os,thread] Failed to start thread - pthread_create
> failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached.
>
> #
>
> # There is insufficient memory for the Java Runtime Environment to
> continue.
>
> # Native memory allocation (mmap) failed to map 16384 bytes for committing
> reserved memory.
>
> # An error report file with more information is saved as:
>
> # /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log
>
> [thread 17966 also had an error]
>
> OpenJDK 64-Bit Server VM warning: INFO:
> os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough
> space' (errno=12)
>
> ERROR:root:Exception while sending command.
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1207, in send_command
>
>     raise Py4JNetworkError("Answer from Java side is empty")
>
> py4j.protocol.Py4JNetworkError: Answer from Java side is empty
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1033, in send_command
>
>     response = connection.send_command(command)
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1211, in send_command
>
>     raise Py4JNetworkError(
>
> py4j.protocol.Py4JNetworkError: Error while receiving
>
> ERROR:py4j.java_gateway:An error occurred while trying to connect to the
> Java server (127.0.0.1:42439
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F127.0.0.1%3A42439%2F&data=04%7C01%7C%7C96a91861f12c46dc454b08d8fa91a005%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637534850695450752%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=dKfkds67mOAtrXSaWrv15vfMx7yUn%2Fjnwc58RdLhGt8%3D&reserved=0>
> )
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>
>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
>
>     return_value = get_return_value(
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
> 334, in get_return_value
>
>     raise Py4JError(
>
> py4j.protocol.Py4JError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 977, in _get_connection
>
>     connection = self.deque.pop()
>
> IndexError: pop from an empty deque
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1115, in start
>
>     self.socket.connect((self.address, self.port))
>
> ConnectionRefusedError: [Errno 111] Connection refused
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>
>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1304, in __call__
>
>     return_value = get_return_value(
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line
> 334, in get_return_value
>
>     raise Py4JError(
>
> py4j.protocol.Py4JError: An error occurred while calling
> z:org.apache.spark.api.python.PythonRDD.collectAndServe
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 977, in _get_connection
>
>     connection = self.deque.pop()
>
> IndexError: pop from an empty deque
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1115, in start
>
>     self.socket.connect((self.address, self.port))
>
> ConnectionRefusedError: [Errno 111] Connection refused
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>
>   File "<input>", line 3, in <module>
>
>   File
> "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py",
> line 197, in runfile
>
>     pydev_imports.execfile(filename, global_vars, local_vars)  # execute
> the script
>
>   File
> "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py",
> line 18, in execfile
>
>     exec(compile(contents+"\n", file, 'exec'), glob, loc)
>
>   File
> "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py",
> line 992, in <module>
>
>     test_70000()
>
>   File
> "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py",
> line 974, in test_70000
>
>     data=rdd00.collect()
>
>   File "/opt/spark/python/pyspark/rdd.py", line 889, in collect
>
>     sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
>
>   File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__
>
>     self._context._jsc.setCallSite(None)
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1303, in __call__
>
>     answer = self.gateway_client.send_command(command)
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1031, in send_command
>
>     connection = self._get_connection()
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 979, in _get_connection
>
>     connection = self._create_connection()
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 985, in _create_connection
>
>     connection.start()
>
>   File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
> line 1127, in start
>
>     raise Py4JNetworkError(msg, e)
>
> py4j.protocol.Py4JNetworkError: An error occurred while trying to connect
> to the Java server (127.0.0.1:42439
> <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F127.0.0.1%3A42439%2F&data=04%7C01%7C%7C96a91861f12c46dc454b08d8fa91a005%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637534850695460748%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=nluVRqUTQeAfMp7ZuF5AdO6IELaRcr86xGkaW6RWOF0%3D&reserved=0>
> )
>
>
>
>
>
> report of free -m:
>
>               total        used        free      shared  buff/cache
> available
>
> Mem:         127462        5548       22680          92       99233
> 120605
>
> Swap:             0           0           0
>
>
>
> Thanks
>
> Markus
>
>

Reply via email to