lidavidm commented on pull request #29818:
URL: https://github.com/apache/spark/pull/29818#issuecomment-710415159


   Running the demo again gives these two plots. While the memory usage looks 
identical, in the no-self-destruct case, Python gets OOMKilled, while it does 
not get OOMKilled in the other case. The reason why the memory usage looks so 
similar is that jemalloc doesn't immediately return unused memory to the OS, 
but rather, when under memory pressure, signals a background thread to start 
cleaning up memory; because of this delay, memory still gets consumed, but at a 
slower pace.
   
   Without self-destruct
   
   
![no-self-destruct](https://user-images.githubusercontent.com/327919/96297169-72d9a080-0fbe-11eb-8014-d8fb8990d677.png)
   
   With self-destruct
   
![self-destruct](https://user-images.githubusercontent.com/327919/96297184-766d2780-0fbe-11eb-822b-753b9efb84c2.png)
   
   ```
   import time
   
   import pyarrow
   pyarrow.jemalloc_set_decay_ms(0)
   
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import rand
   
   self_destruct = "true"
   print('self_destruct:', self_destruct)
   spark = SparkSession.builder \
       .master("local") \
       .appName("demo") \
       .config("spark.driver.maxResultSize", "8g") \
       .config("spark.driver.memory", "4g") \
       .config("spark.executor.memory", "512m") \
       .config("spark.worker.memory", "512m") \
       .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
       .config("spark.sql.execution.arrow.pyspark.selfDestruct.enabled", 
"true") \
       .getOrCreate()
   
   # 6 GiB dataframe. Tweak this to adjust for the amount of RAM you have
   # (target > ~1/2 of free memory). I had ~8 GiB free for this demo.
   # union() generates a dataframe that doesn't take so much memory in Java
   rows = 2 ** 17
   cols = 64
   df = spark.range(0, rows).select(*[rand(seed=i) for i in range(cols)])
   df = df.union(df).union(df).union(df).union(df).union(df)
   df = df.union(df)
   df = df.union(df)
   df = df.union(df)
   
   pdf = df.toPandas()
   
   print('================ MEMORY USAGE:', sum(pdf.memory_usage()) / 2**20, 
"MiB")
   # Give memory_profiler some more time
   time.sleep(2)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to