Hey Enrico it does help to understand it, thanks for explaining.

Regarding this comment

> PySpark and Scala should behave identically here

Is it ok that Scala and PySpark optimization works differently in this case?


вт, 5 дек. 2023 г. в 20:08, Enrico Minack <enrico-min...@gmx.de>:

> Hi Michail,
>
> with spark.conf.set("spark.sql.planChangeLog.level", "WARN") you can see
> how Spark optimizes the query plan.
>
> In PySpark, the plan is optimized into
>
> Project ...
>   +- CollectMetrics 2, [count(1) AS count(1)#200L]
>   +- LocalTableScan <empty>, [col1#125, col2#126L, col3#127, col4#132L]
>
> The entire join gets optimized away into an empty table. Looks like it
> figures out that df has no rows with col1 = 'c'. So df is never consumed
> / iterated, so the observation does not retrieve any metrics.
>
> In Scala, the optimization is different:
>
> *(2) Project ...
>   +- CollectMetrics 2, [count(1) AS count(1)#63L]
>      +- *(1) Project [col1#37, col2#38, col3#39, cast(null as int) AS
> col4#51]
>         +- *(1) Filter (isnotnull(col1#37) AND (col1#37 = c))
>            +- CollectMetrics 1, [count(1) AS count(1)#56L]
>               +- LocalTableScan [col1#37, col2#38, col3#39]
>
> where the join also gets optimized away, but table df is still filtered
> for col1 = 'c', which iterates over the rows and collects the metrics for
> observation 1.
>
> Hope this helps to understand why there are no observed metrics for
> Observation("1") in your case.
>
> Enrico
>
>
>
> Am 04.12.23 um 10:45 schrieb Enrico Minack:
>
> Hi Michail,
>
> observations as well as ordinary accumulators only observe / process
> rows that are iterated / consumed by downstream stages. If the query
> plan decides to skip one side of the join, that one will be removed from
> the final plan completely. Then, the Observation will not retrieve any
> metrics and .get waits forever. Definitively not helpful.
>
> When creating the Observation class, we thought about a timeout for the
> get method but could not find a use case where the user would call get
> without first executing the query. Here is a scenario where though
> executing the query there is no observation result. We will rethink this.
>
> Interestingly, your example works in Scala:
>
> import org.apache.spark.sql.Observation
>
> val df = Seq(("a", 1, "1 2 3 4"), ("b", 2, "1 2 3 4")).toDF("col1",
> "col2", "col3")
> val df_join = Seq(("a", 6), ("b", 5)).toDF("col1", "col4")
>
> val o1 = Observation()
> val o2 = Observation()
>
> val df1 = df.observe(o1, count("*")).filter("col1 = 'c'")
> val df2 = df1.join(df_join, "col1", "left").observe(o2, count("*"))
>
> df2.show()
> +----+----+----+----+
> |col1|col2|col3|col4|
> +----+----+----+----+
> +----+----+----+----+
>
> o1.get
> Map[String,Any] = Map(count(1) -> 2)
>
> o2.get
> Map[String,Any] = Map(count(1) -> 0)
>
>
> Pyspark and Scala should behave identically here. I will investigate.
>
> Cheers,
> Enrico
>
>
>
> Am 02.12.23 um 17:11 schrieb Михаил Кулаков:
>
> Hey folks, I actively using observe method on my spark jobs and
> noticed interesting behavior:
> Here is an example of working and non working code:
> https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c
> <https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c>
> <https://gist.github.com/Coola4kov/8aeeb05abd39794f8362a3cf1c66519c>
>
> In a few words, if I'm joining dataframe after some filter rules and
> it became empty, observations configured on the first dataframe never
> return any results, unless some action called on the empty dataframe
> specifically before join.
>
> Looks like a bug to me, I will appreciate any advice on how to fix
> this behavior.
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>

Reply via email to