Hi! I looked into the code and find a way to improve it.
With the improvement your test runs just fine: Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 3.2.0-SNAPSHOT /_/ Using Python version 3.8.1 (default, Dec 30 2020 22:53:18) Spark context Web UI available at http://192.168.0.199:4040 Spark context available as 'sc' (master = local, app id = local-1617982367872). SparkSession available as 'spark'. In [1]: import pyspark In [2]: conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1") In [3]: sc=pyspark.SparkContext.getOrCreate(conf) In [4]: rows=70000 In [5]: data=list(range(rows)) In [6]: rdd=sc.parallelize(data,rows) In [7]: assert rdd.getNumPartitions()==rows In [8]: rdd0=rdd.filter(lambda x:False) In [9]: assert rdd0.getNumPartitions()==rows In [10]: rdd00=rdd0.coalesce(1) In [11]: data=rdd00.collect() 21/04/09 17:32:54 WARN TaskSetManager: Stage 0 contains a task of very large siz e (4729 KiB). The maximum recommended task size is 1000 KiB. In [12]: assert data==[] In [13]: I will create a jira and need to add some unittest before opening the PR. Best Regards, Attila On Fri, Apr 9, 2021 at 7:04 AM Weiand, Markus, NMA-CFD < markus.wei...@bertelsmann.de> wrote: > I’ve changed the code to set driver memory to 100g, changed python code: > > import pyspark > > > conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1").set(key="spark.driver.memory", > value="100g") > > sc=pyspark.SparkContext.getOrCreate(conf) > > rows=70000 > > data=list(range(rows)) > > rdd=sc.parallelize(data,rows) > > assert rdd.getNumPartitions()==rows > > rdd0=rdd.filter(lambda x:False) > > assert rdd0.getNumPartitions()==rows > > rdd00=rdd0.coalesce(1) > > data=rdd00.collect() > > assert data==[] > > > > Still the same error happens: > > > > 21/04/09 04:48:38 WARN TaskSetManager: Stage 0 contains a task of very > large size (4732 KiB). The maximum recommended task size is 1000 KiB. > > OpenJDK 64-Bit Server VM warning: INFO: > os::commit_memory(0x00007f4643550000, 16384, 0) failed; error='Not enough > space' (errno=12) > > [423.701s][warning][os,thread] Attempt to protect stack guard pages failed > (0x00007f4640d28000-0x00007f4640d2c000). > > [423.701s][warning][os,thread] Attempt to deallocate stack guard pages > failed. > > [423.704s][warning][os,thread] Failed to start thread - pthread_create > failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached. > > # > > # There is insufficient memory for the Java Runtime Environment to > continue. > > # Native memory allocation (mmap) failed to map 16384 bytes for committing > reserved memory. > > > > A function which needs 423 seconds to crash with excessive memory > consumption when trying to coalesce 70000 empty partitions is not very > practical. As I do not know the limits in which coalesce without shuffling > can be used safely and with performance, I will now always use coalesce > with shuffling, even though in theory this will come with quite a > performance decrease. > > > > Markus > > > > *Von:* Russell Spitzer <russell.spit...@gmail.com> > *Gesendet:* Donnerstag, 8. April 2021 15:24 > *An:* Weiand, Markus, NMA-CFD <markus.wei...@bertelsmann.de> > *Cc:* user@spark.apache.org > *Betreff:* Re: possible bug > > > > Could be that the driver JVM cannot handle the metadata required to store > the partition information of a 70k partition RDD. I see you say you have a > 100GB driver but i'm not sure where you configured that? > > Did you set --driver-memory 100G ? > > > > On Thu, Apr 8, 2021 at 8:08 AM Weiand, Markus, NMA-CFD < > markus.wei...@bertelsmann.de> wrote: > > This is the reduction of an error in a complex program where allocated 100 > GB driver (=worker=executor as local mode) memory. In the example I used > the default size, as the puny example shouldn’t need more anyway. > > And without the coalesce or with coalesce(1,True) everything works fine. > > I’m trying to coalesce an empty rdd with 70000 partitions in an empty rdd > with 1 partition, why is this a problem without shuffling? > > > > *Von:* Sean Owen <sro...@gmail.com> > *Gesendet:* Donnerstag, 8. April 2021 15:00 > *An:* Weiand, Markus, NMA-CFD <markus.wei...@bertelsmann.de> > *Cc:* user@spark.apache.org > *Betreff:* Re: possible bug > > > > That's a very low level error from the JVM. Any chance you are > misconfiguring the executor size? like to 10MB instead of 10GB, that kind > of thing. Trying to think of why the JVM would have very little memory to > operate. > > An app running out of mem would not look like this. > > > > On Thu, Apr 8, 2021 at 7:53 AM Weiand, Markus, NMA-CFD < > markus.wei...@bertelsmann.de> wrote: > > Hi all, > > > > I'm using spark on a c5a.16xlarge machine in amazon cloud (so having 64 > cores and 128 GB RAM). I'm using spark 3.01. > > > > The following python code leads to an exception, is this a bug or is my > understanding of the API incorrect? > > > > import pyspark > > conf=pyspark.SparkConf().setMaster("local[64]").setAppName("Test1") > > sc=pyspark.SparkContext.getOrCreate(conf) > > rows=70000 > > data=list(range(rows)) > > rdd=sc.parallelize(data,rows) > > assert rdd.getNumPartitions()==rows > > rdd0=rdd.filter(lambda x:False) > > assert rdd0.getNumPartitions()==rows > > rdd00=rdd0.coalesce(1) > > data=rdd00.collect() > > assert data==[] > > > > output when starting from PyCharm: > > > > /home/ubuntu/PycharmProjects/<projekt-dir>/venv/bin/python > /opt/pycharm-2020.2.3/plugins/python/helpers/pydev/pydevconsole.py > --mode=client --port=41185 > > import sys; print('Python %s on %s' % (sys.version, sys.platform)) > > sys.path.extend(['/home/ubuntu/PycharmProjects/<projekt-dir>']) > > PyDev console: starting. > > Python 3.8.5 (default, Jan 27 2021, 15:41:15) > > [GCC 9.3.0] on linux > > import os > > os.environ['PYTHONHASHSEED'] = '0' > > runfile('/home/ubuntu/PycharmProjects/<projekt-dir>/tests/test.py', > wdir='/home/ubuntu/PycharmProjects/<projekt-dir>/tests') > > WARNING: An illegal reflective access operation has occurred > > WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform > (file:/opt/spark/jars/spark-unsafe_2.12-3.0.1.jar) to constructor > java.nio.DirectByteBuffer(long,int) > > WARNING: Please consider reporting this to the maintainers of > org.apache.spark.unsafe.Platform > > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > > WARNING: All illegal access operations will be denied in a future release > > 21/04/08 12:12:26 WARN NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > > 21/04/08 12:12:29 WARN TaskSetManager: Stage 0 contains a task of very > large size (4732 KiB). The maximum recommended task size is 1000 KiB. > > [Stage 0:> (0 + > 1) / 1][423.190s][warning][os,thread] Attempt to protect stack guard pages > failed (0x00007f43d23ff000-0x00007f43d2403000). > > [423.190s][warning][os,thread] Attempt to deallocate stack guard pages > failed. > > OpenJDK 64-Bit Server VM warning: INFO: > os::commit_memory(0x00007f43d300b000, 16384, 0) failed; error='Not enough > space' (errno=12) > > [423.231s][warning][os,thread] Failed to start thread - pthread_create > failed (EAGAIN) for attributes: stacksize: 1024k, guardsize: 0k, detached. > > # > > # There is insufficient memory for the Java Runtime Environment to > continue. > > # Native memory allocation (mmap) failed to map 16384 bytes for committing > reserved memory. > > # An error report file with more information is saved as: > > # /home/ubuntu/PycharmProjects/<projekt-dir>/tests/hs_err_pid17755.log > > [thread 17966 also had an error] > > OpenJDK 64-Bit Server VM warning: INFO: > os::commit_memory(0x00007f4b7bd81000, 262144, 0) failed; error='Not enough > space' (errno=12) > > ERROR:root:Exception while sending command. > > Traceback (most recent call last): > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1207, in send_command > > raise Py4JNetworkError("Answer from Java side is empty") > > py4j.protocol.Py4JNetworkError: Answer from Java side is empty > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1033, in send_command > > response = connection.send_command(command) > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1211, in send_command > > raise Py4JNetworkError( > > py4j.protocol.Py4JNetworkError: Error while receiving > > ERROR:py4j.java_gateway:An error occurred while trying to connect to the > Java server (127.0.0.1:42439 > <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F127.0.0.1%3A42439%2F&data=04%7C01%7C%7C96a91861f12c46dc454b08d8fa91a005%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637534850695450752%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=dKfkds67mOAtrXSaWrv15vfMx7yUn%2Fjnwc58RdLhGt8%3D&reserved=0> > ) > > Traceback (most recent call last): > > File "/opt/spark/python/pyspark/rdd.py", line 889, in collect > > sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1304, in __call__ > > return_value = get_return_value( > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line > 334, in get_return_value > > raise Py4JError( > > py4j.protocol.Py4JError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 977, in _get_connection > > connection = self.deque.pop() > > IndexError: pop from an empty deque > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1115, in start > > self.socket.connect((self.address, self.port)) > > ConnectionRefusedError: [Errno 111] Connection refused > > Traceback (most recent call last): > > File "/opt/spark/python/pyspark/rdd.py", line 889, in collect > > sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1304, in __call__ > > return_value = get_return_value( > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line > 334, in get_return_value > > raise Py4JError( > > py4j.protocol.Py4JError: An error occurred while calling > z:org.apache.spark.api.python.PythonRDD.collectAndServe > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 977, in _get_connection > > connection = self.deque.pop() > > IndexError: pop from an empty deque > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1115, in start > > self.socket.connect((self.address, self.port)) > > ConnectionRefusedError: [Errno 111] Connection refused > > During handling of the above exception, another exception occurred: > > Traceback (most recent call last): > > File "<input>", line 3, in <module> > > File > "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_bundle/pydev_umd.py", > line 197, in runfile > > pydev_imports.execfile(filename, global_vars, local_vars) # execute > the script > > File > "/opt/pycharm-2020.2.3/plugins/python/helpers/pydev/_pydev_imps/_pydev_execfile.py", > line 18, in execfile > > exec(compile(contents+"\n", file, 'exec'), glob, loc) > > File > "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", > line 992, in <module> > > test_70000() > > File > "/home/ubuntu/PycharmProjects/SPO_as_a_Service/tests/test_modeling_paf.py", > line 974, in test_70000 > > data=rdd00.collect() > > File "/opt/spark/python/pyspark/rdd.py", line 889, in collect > > sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) > > File "/opt/spark/python/pyspark/traceback_utils.py", line 78, in __exit__ > > self._context._jsc.setCallSite(None) > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1303, in __call__ > > answer = self.gateway_client.send_command(command) > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1031, in send_command > > connection = self._get_connection() > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 979, in _get_connection > > connection = self._create_connection() > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 985, in _create_connection > > connection.start() > > File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", > line 1127, in start > > raise Py4JNetworkError(msg, e) > > py4j.protocol.Py4JNetworkError: An error occurred while trying to connect > to the Java server (127.0.0.1:42439 > <https://eur02.safelinks.protection.outlook.com/?url=http%3A%2F%2F127.0.0.1%3A42439%2F&data=04%7C01%7C%7C96a91861f12c46dc454b08d8fa91a005%7C1ca8bd943c974fc68955bad266b43f0b%7C0%7C0%7C637534850695460748%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=nluVRqUTQeAfMp7ZuF5AdO6IELaRcr86xGkaW6RWOF0%3D&reserved=0> > ) > > > > > > report of free -m: > > total used free shared buff/cache > available > > Mem: 127462 5548 22680 92 99233 > 120605 > > Swap: 0 0 0 > > > > Thanks > > Markus > >