[ https://issues.apache.org/jira/browse/SPARK-30552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Franz updated SPARK-30552: -------------------------- Environment: python : 3.6.9.final.0 python-bits : 64 OS : Windows OS-release : 10 machine : AMD64 processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel pyspark: 2.4.4 pandas : 0.25.3 numpy : 1.17.4 pyarrow : 0.15.1 was: INSTALLED VERSIONS ------------------ commit : None python : 3.6.9.final.0 python-bits : 64 OS : Windows OS-release : 10 machine : AMD64 processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel byteorder : little LC_ALL : None LANG : de_DE.UTF-8 LOCALE : None.None pandas : 0.25.3 numpy : 1.17.4 pytz : 2019.3 dateutil : 2.8.1 pip : 19.3.1 setuptools : 41.6.0.post20191030 Cython : None pytest : 5.3.0 hypothesis : None sphinx : 2.2.1 blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 2.10.3 IPython : 7.11.1 pandas_datareader: None bs4 : None bottleneck : None fastparquet : None gcsfs : None lxml.etree : None matplotlib : None numexpr : None odfpy : None openpyxl : None pandas_gbq : None pyarrow : 0.15.1 pytables : None s3fs : None scipy : None sqlalchemy : None tables : None xarray : None xlrd : None xlwt : None xlsxwriter : None > Chained spark column expressions with distinct windows specs produce > inefficient DAG > ------------------------------------------------------------------------------------ > > Key: SPARK-30552 > URL: https://issues.apache.org/jira/browse/SPARK-30552 > Project: Spark > Issue Type: Bug > Components: PySpark, Spark Core > Affects Versions: 2.4.4 > Environment: python : 3.6.9.final.0 > python-bits : 64 > OS : Windows > OS-release : 10 > machine : AMD64 > processor : Intel64 Family 6 Model 158 Stepping 10, GenuineIntel > pyspark: 2.4.4 > pandas : 0.25.3 > numpy : 1.17.4 > pyarrow : 0.15.1 > Reporter: Franz > Priority: Major > > h2. Context > Let's say you deal with time series data. Your desired outcome relies on > multiple window functions with distinct window specifications. The result may > resemble a single spark column expression, like an identifier for intervals. > h2. Status Quo > Usually, I don't store intermediate results with `df.withColumn` but rather > chain/stack column expressions and trust Spark to find the most effective DAG > (when dealing with DataFrame). > h2. Reproducible example > However, in the following example (PySpark 2.4.4 standalone), storing an > intermediate result with `df.withColumn` reduces the DAG complexity. Let's > consider following test setup: > {code:python} > import pandas as pd > import numpy as np > from pyspark.sql import SparkSession, Window > from pyspark.sql import functions as F > spark = SparkSession.builder.getOrCreate() > dfp = pd.DataFrame( > { > "col1": np.random.randint(0, 5, size=100), > "col2": np.random.randint(0, 5, size=100), > "col3": np.random.randint(0, 5, size=100), > "col4": np.random.randint(0, 5, size=100), > } > ) > df = spark.createDataFrame(dfp) > df.show(5) > +----+----+----+----+ > |col1|col2|col3|col4| > +----+----+----+----+ > | 1| 2| 4| 1| > | 0| 2| 3| 0| > | 2| 0| 1| 0| > | 4| 1| 1| 2| > | 1| 3| 0| 4| > +----+----+----+----+ > only showing top 5 rows > {code} > The computation is arbitrary. Basically we have 2 window specs and 3 > computational steps. The 3 computational steps are dependend on each other > and use alternating window specs: > {code:python} > w1 = Window.partitionBy("col1").orderBy("col2") > w2 = Window.partitionBy("col3").orderBy("col4") > # first step, arbitrary window func over 1st window > step1 = F.lag("col3").over(w1) > # second step, arbitrary window func over 2nd window with step 1 > step2 = F.lag(step1).over(w2) > # third step, arbitrary window func over 1st window with step 2 > step3 = F.when(step2 > 1, F.max(step2).over(w1)) > df_result = df.withColumn("result", step3) > {code} > Inspecting the phyiscal plan via `df_result.explain()` reveals 4 exchanges > and sorts! However, only 3 should be necessary here because we change the > window spec only twice. > {code:python} > df_result.explain() > == Physical Plan == > *(7) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (_we0#25L > 1) > THEN _we1#26L END AS result#22L] > +- Window [lag(_w0#23L, 1, null) windowspecdefinition(col3#2L, col4#3L ASC > NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#25L], [col3#2L], > [col4#3L ASC NULLS FIRST] > +- *(6) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(col3#2L, 200) > +- *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, _w0#23L, > _we1#26L] > +- Window [max(_w1#24L) windowspecdefinition(col1#0L, col2#1L ASC > NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), > currentrow$())) AS _we1#26L], [col1#0L], [col2#1L ASC NULLS FIRST] > +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS > FIRST], false, 0 > +- Exchange hashpartitioning(col1#0L, 200) > +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, > _w0#23L, _w1#24L] > +- Window [lag(_w0#27L, 1, null) > windowspecdefinition(col3#2L, col4#3L ASC NULLS FIRST, > specifiedwindowframe(RowFrame, -1, -1)) AS _w1#24L], [col3#2L], [col4#3L ASC > NULLS FIRST] > +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC > NULLS FIRST], false, 0 > +- Exchange hashpartitioning(col3#2L, 200) > +- Window [lag(col3#2L, 1, null) > windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, > specifiedwindowframe(RowFrame, -1, -1)) AS _w0#27L, lag(col3#2L, 1, null) > windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, > specifiedwindowframe(RowFrame, -1, -1)) AS _w0#23L], [col1#0L], [col2#1L ASC > NULLS FIRST] > +- *(1) Sort [col1#0L ASC NULLS FIRST, > col2#1L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(col1#0L, > 200) > +- Scan > ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L] > {code} > h2. Improvement > To get a better DAG, we slightly modify the code to store the column > expression of `step2` with `withColumn` and just pass the reference of this > column. The new logical plan requires only 3 shuffles indeed! > {code:python} > w1 = Window.partitionBy("col1").orderBy("col2") > w2 = Window.partitionBy("col3").orderBy("col4") > # first step, arbitrary window func > step1 = F.lag("col3").over(w1) > # second step, arbitrary window func over 2nd window with step 1 > step2 = F.lag(step1).over(w2) > # save temporary > df = df.withColumn("tmp_variable", step2) > step2 = F.col("tmp_variable") > # third step, arbitrary window func over 1st window with step 2 > step3 = F.when(step2 > 1, F.max(step2).over(w1)) > df_result = df.withColumn("result", step3).drop("tmp_variable") > df_result.explain() > == Physical Plan == > *(5) Project [col1#0L, col2#1L, col3#2L, col4#3L, CASE WHEN (tmp_variable#33L > > 1) THEN _we0#42L END AS result#41L] > +- Window [max(tmp_variable#33L) windowspecdefinition(col1#0L, col2#1L ASC > NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), > currentrow$())) AS _we0#42L], [col1#0L], [col2#1L ASC NULLS FIRST] > +- *(4) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(col1#0L, 200) > +- *(3) Project [col1#0L, col2#1L, col3#2L, col4#3L, > tmp_variable#33L] > +- Window [lag(_w0#34L, 1, null) windowspecdefinition(col3#2L, > col4#3L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS > tmp_variable#33L], [col3#2L], [col4#3L ASC NULLS FIRST] > +- *(2) Sort [col3#2L ASC NULLS FIRST, col4#3L ASC NULLS > FIRST], false, 0 > +- Exchange hashpartitioning(col3#2L, 200) > +- Window [lag(col3#2L, 1, null) > windowspecdefinition(col1#0L, col2#1L ASC NULLS FIRST, > specifiedwindowframe(RowFrame, -1, -1)) AS _w0#34L], [col1#0L], [col2#1L ASC > NULLS FIRST] > +- *(1) Sort [col1#0L ASC NULLS FIRST, col2#1L ASC > NULLS FIRST], false, 0 > +- Exchange hashpartitioning(col1#0L, 200) > +- Scan > ExistingRDD[col1#0L,col2#1L,col3#2L,col4#3L] > {code} > h2. Relevance > My original example was even more complex and resulted in an even greater > difference of the DAG (on real world data up to 10 times slower) > h2. Question > Does anyone have an answer to this odd behavior? I've thought that > stacking/chaining column expressions is best practice since it allows Spark > to optimize intermediate steps most effectively (in contrast to create > references for intermediate results). > Thanks in advance. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org