Hey Enrico it does help to understand it, thanks for explaining. Regarding this comment
> PySpark and Scala should behave identically here Is it ok that Scala and PySpark optimization works differently in this case? вт, 5 дек. 2023 г. в 20:08, Enrico Minack <enrico-min...@gmx.de>: > Hi Michail, > > with spark.conf.set("spark.sql.planChangeLog.level", "WARN") you can see > how Spark optimizes the query plan. > > In PySpark, the plan is optimized into > > Project ... > +- CollectMetrics 2, [count(1) AS count(1)#200L] > +- LocalTableScan <empty>, [col1#125, col2#126L, col3#127, col4#132L] > > The entire join gets optimized away into an empty table. Looks like it > figures out that df has no rows with col1 = 'c'. So df is never consumed > / iterated, so the observation does not retrieve any metrics. > > In Scala, the optimization is different: > > *(2) Project ... > +- CollectMetrics 2, [count(1) AS count(1)#63L] > +- *(1) Project [col1#37, col2#38, col3#39, cast(null as int) AS > col4#51] > +- *(1) Filter (isnotnull(col1#37) AND (col1#37 = c)) > +- CollectMetrics 1, [count(1) AS count(1)#56L] > +- LocalTableScan [col1#37, col2#38, col3#39] > > where the join also gets optimized away, but table df is still filtered > for col1 = 'c', which iterates over the rows and collects the metrics for > observation 1. > > Hope this helps to understand why there are no observed metrics for > Observation("1") in your case. > > Enrico > > > > Am 04.12.23 um 10:45 schrieb Enrico Minack: > > Hi Michail, > > observations as well as ordinary accumulators only observe / process > rows that are iterated / consumed by downstream stages. If the query > plan decides to skip one side of the join, that one will be removed from > the final plan completely. Then, the Observation will not retrieve any > metrics and .get waits forever. Definitively not helpful. > > When creating the Observation class, we thought about a timeout for the > get method but could not find a use case where the user would call get > without first executing the query. Here is a scenario where though > executing the query there is no observation result. We will rethink this. > > Interestingly, your example works in Scala: > > import org.apache.spark.sql.Observation > > val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1", > "col2", "col3") > val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4") > > val o1 = Observation() > val o2 = Observation() > > val df1 = df.observe(o1, count("*")).filter("col1 = 'c'") > val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*")) > > df2.show() > +----+----+----+----+ > |col1|col2|col3|col4| > +----+----+----+----+ > +----+----+----+----+ > > o1.get > Map[String,Any] = Map(count(1) -> 2) > > o2.get > Map[String,Any] = Map(count(1) -> 0) > > > Pyspark and Scala should behave identically here. I will investigate. > > Cheers, > Enrico > > > > Am 02.12.23 um 17:11 schrieb Михаил Кулаков: > > Hey folks, I actively using observe method on my spark jobs and > noticed interesting behavior: > Here is an example of working and non working code: > https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c > <https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c> > <https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c> > > In a few words, if I'm joining dataframe after some filter rules and > it became empty, observations configured on the first dataframe never > return any results, unless some action called on the empty dataframe > specifically before join. > > Looks like a bug to me, I will appreciate any advice on how to fix > this behavior. > > > > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > >