[ https://issues.apache.org/jira/browse/SPARK-22410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16235522#comment-16235522 ]
Apache Spark commented on SPARK-22410: -------------------------------------- User 'viirya' has created a pull request for this issue: https://github.com/apache/spark/pull/19642 > Excessive spill for Pyspark UDF when a row has shrunk > ----------------------------------------------------- > > Key: SPARK-22410 > URL: https://issues.apache.org/jira/browse/SPARK-22410 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.0 > Environment: Reproduced on up-to-date master > Reporter: Clément Stenac > Priority: Minor > > Hi, > The following code processes 900KB of data and outputs around 2MB of data. > However, to process it, Spark needs to spill roughly 12 GB of data. > {code:python} > from pyspark.sql import SparkSession > from pyspark.sql.functions import * > from pyspark.sql.types import * > import json > ss = SparkSession.builder.getOrCreate() > # Create a few lines of data (5 lines). > # Each line is made of a string, and an array of 10000 strings > # Total size of data is around 900 KB > lines_of_file = [ "this is a line" for x in xrange(10000) ] > file_obj = [ "this_is_a_foldername/this_is_a_filename", lines_of_file ] > data = [ file_obj for x in xrange(5) ] > # Make a two-columns dataframe out of it > small_df = ss.sparkContext.parallelize(data).map(lambda x : (x[0], > x[1])).toDF(["file", "lines"]) > # We then explode the array, so we now have 50000 rows in the dataframe, with > 2 columns, the 2nd > # column now has only "this is a line" as content > exploded = small_df.select("file", explode("lines")) > print("Exploded") > print(exploded.explain()) > # Now, just process it with a trivial Pyspark UDF that touches the first > column > # (the one which was not an array) > def split_key(s): > return s.split("/")[1] > split_key_udf = udf(split_key, StringType()) > with_filename = exploded.withColumn("filename", split_key_udf("file")) > # As expected, explain plan is very simple (BatchEval -> Explode -> Project > -> ScanExisting) > print(with_filename.explain()) > # Getting the head will spill around 12 GB of data > print(with_filename.head()) > {code} > The spill happens in the HybridRowQueue that is used to merge the part that > went through the Python worker and the part that didn't. > The problem comes from the fact that when it is added to the HybridRowQueue, > the UnsafeRow has a totalSizeInBytes of ~240000 (seen by adding debug message > in HybridRowQueue), whereas, since it's after the explode, the actual size of > the row should be in the ~60 bytes range. > My understanding is that the row has retained the size it consumed *prior* to > the explode (at that time, the size of each of the 5 rows was indeed ~240000 > bytes. > A workaround is to do exploded.cache() before calling the UDF. The fact of > going through the InMemoryColumnarTableScan "resets" the wrongful size of the > UnsafeRow. > Thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org