[ 
https://issues.apache.org/jira/browse/SPARK-35256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ondrej Kokes updated SPARK-35256:
---------------------------------
    Summary: Subexpression elimination leading to a performance regression  
(was: str_to_map + split performance regression)

> Subexpression elimination leading to a performance regression
> -------------------------------------------------------------
>
>                 Key: SPARK-35256
>                 URL: https://issues.apache.org/jira/browse/SPARK-35256
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Ondrej Kokes
>            Priority: Minor
>         Attachments: bisect_log.txt, bisect_timing.csv
>
>
> I'm seeing almost double the runtime between 3.0.1 and 3.1.1 in my pipeline 
> that does mostly str_to_map, split and a few other operations - all 
> projections, no joins or aggregations (it's here only to trigger the 
> pipeline). I cut it down to the simplest reproducible example I could - 
> anything I remove from this changes the runtime difference quite 
> dramatically. (even moving those two expressions from f.when to standalone 
> columns makes the difference disappear)
> {code:java}
> import time
> import os
> import pyspark  
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as f
> if __name__ == '__main__':
>     print(pyspark.__version__)
>     spark = SparkSession.builder.getOrCreate()
>     filename = 'regression.csv'
>     if not os.path.isfile(filename):
>         with open(filename, 'wt') as fw:
>             fw.write('foo\n')
>             for _ in range(10_000_000):
>                 fw.write('foo=bar&baz=bak&bar=f,o,1:2:3\n')
>     df = spark.read.option('header', True).csv(filename)
>     t = time.time()
>     dd = (df
>             .withColumn('my_map', f.expr('str_to_map(foo, "&", "=")'))
>             .withColumn('extracted',
>                             # without this top level split it is only 50% 
> slower, with it
>                             # the runtime almost doubles
>                             f.split(f.split(f.col("my_map")["bar"], ",")[2], 
> ":")[0]
>                        )
>             .select(
>                 f.when(
>                     f.col("extracted").startswith("foo"), f.col("extracted")
>                 ).otherwise(
>                     f.concat(f.lit("foo"), f.col("extracted"))
>                 ).alias("foo")
>             )
>         )
>     # dd.explain(True)
>     _ = dd.groupby("foo").count().count()
>     print("elapsed", time.time() - t)
> {code}
> Running this in 3.0.1 and 3.1.1 respectively (both installed from PyPI, on my 
> local macOS)
> {code:java}
> 3.0.1
> elapsed 21.262351036071777
> 3.1.1
> elapsed 40.26582884788513
> {code}
> (Meaning the transformation took 21 seconds in 3.0.1 and 40 seconds in 3.1.1)
> Feel free to make the CSV smaller to get a quicker feedback loop - it scales 
> linearly (I developed this with 2M rows).
> It might be related to my previous issue - SPARK-32989 - there are similar 
> operations, nesting etc. (splitting on the original column, not on a map, 
> makes the difference disappear)
> I tried dissecting the queries in SparkUI and via explain, but both 3.0.1 and 
> 3.1.1 produced identical plans.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to