Hello Winston, I looked into the suggested code snippet. But I am getting the following error
``` value listenerManager is not a member of org.apache.spark.sql.SparkSession ``` Although I can see it is available in the API. https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/SparkSession.html With Regards, Vibhatha Abeykoon, PhD On Wed, Aug 2, 2023 at 4:22 PM Vibhatha Abeykoon <vibha...@gmail.com> wrote: > Hello Winston, > > Thanks again for this response, I will check this one out. > > On Wed, Aug 2, 2023 at 3:50 PM Winston Lai <weiruanl...@gmail.com> wrote: > >> >> Hi Vibhatha, >> >> I helped you post this question to another community. There is one answer >> by someone else for your reference. >> >> To access the logical plan or optimized plan, you can register a custom >> QueryExecutionListener and retrieve the plans during the query execution >> process. Here's an example of how to do it in Scala: >> >> > import org.apache.spark.sql.{SparkSession, QueryExecutionListener} >> > >> > // Create a custom QueryExecutionListener >> > class CustomQueryExecutionListener extends QueryExecutionListener { >> > override def onSuccess(funcName: String, qe: >> org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = { >> > // Retrieve the logical plan >> > val logicalPlan = qe.logical >> > >> > // Retrieve the optimized plan >> > val optimizedPlan = qe.optimizedPlan >> > >> > // Process the plans with your custom function >> > processPlans(logicalPlan, optimizedPlan) >> > } >> > >> > override def onFailure(funcName: String, qe: >> org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit >> = {} >> > } >> > >> > // Create a SparkSession >> > val spark = SparkSession.builder() >> > .appName("Example") >> > .getOrCreate() >> > >> > // Register the custom QueryExecutionListener >> > spark.listenerManager.register(new CustomQueryExecutionListener) >> > >> > // Perform your DataFrame operations >> > val df = spark.read.csv("path/to/file.csv") >> > val filteredDF = df.filter(df("column") > 10) >> > val resultDF = filteredDF.select("column1", "column2") >> > >> > // Trigger the execution of the DF to invoke the listener >> > resultDF.show() >> >> Thank You & Best Regards >> Winston Lai >> ------------------------------ >> *From:* Vibhatha Abeykoon <vibha...@gmail.com> >> *Sent:* Wednesday, August 2, 2023 5:03:15 PM >> *To:* Ruifeng Zheng <zrfli...@gmail.com> >> *Cc:* Winston Lai <weiruanl...@gmail.com>; user@spark.apache.org < >> user@spark.apache.org> >> *Subject:* Re: Extracting Logical Plan >> >> I understand. I sort of drew the same conclusion. But I wasn’t sure. >> Thanks everyone for taking time on this. >> >> On Wed, Aug 2, 2023 at 2:29 PM Ruifeng Zheng <zrfli...@gmail.com> wrote: >> >> In Spark Connect, I think the only API to show optimized plan is >> `df.explain("extended")` as Winston mentioned, but it is not a LogicalPlan >> object. >> >> On Wed, Aug 2, 2023 at 4:36 PM Vibhatha Abeykoon <vibha...@gmail.com> >> wrote: >> >> Hello Ruifeng, >> >> Thank you for these pointers. Would it be different if I use the Spark >> connect? I am not using the regular SparkSession. I am pretty new to these >> APIs. Appreciate your thoughts. >> >> On Wed, Aug 2, 2023 at 2:00 PM Ruifeng Zheng <zrfli...@gmail.com> wrote: >> >> Hi Vibhatha, >> I think those APIs are still avaiable? >> >> >> >> ``` >> Welcome to >> ____ __ >> / __/__ ___ _____/ /__ >> _\ \/ _ \/ _ `/ __/ '_/ >> /___/ .__/\_,_/_/ /_/\_\ version 3.4.1 >> /_/ >> >> Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 11.0.19) >> Type in expressions to have them evaluated. >> Type :help for more information. >> >> scala> val df = spark.range(0, 10) >> df: org.apache.spark.sql.Dataset[Long] = [id: bigint] >> >> scala> df.queryExecution >> res0: org.apache.spark.sql.execution.QueryExecution = >> == Parsed Logical Plan == >> Range (0, 10, step=1, splits=Some(12)) >> >> == Analyzed Logical Plan == >> id: bigint >> Range (0, 10, step=1, splits=Some(12)) >> >> == Optimized Logical Plan == >> Range (0, 10, step=1, splits=Some(12)) >> >> == Physical Plan == >> *(1) Range (0, 10, step=1, splits=12) >> >> scala> df.queryExecution.optimizedPlan >> res1: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = >> Range (0, 10, step=1, splits=Some(12)) >> ``` >> >> >> >> On Wed, Aug 2, 2023 at 3:58 PM Vibhatha Abeykoon <vibha...@gmail.com> >> wrote: >> >> Hi Winston, >> >> I need to use the LogicalPlan object and process it with another function >> I have written. In earlier Spark versions we can access that via the >> dataframe object. So if it can be accessed via the UI, is there an API to >> access the object? >> >> On Wed, Aug 2, 2023 at 1:24 PM Winston Lai <weiruanl...@gmail.com> wrote: >> >> Hi Vibhatha, >> >> How about reading the logical plan from Spark UI, do you have access to >> the Spark UI? I am not sure what infra you run your Spark jobs on. Usually >> you should be able to view the logical and physical plan under Spark UI in >> text version at least. It is independent from the language (e.g., >> scala/Python/R) that you use to run Spark. >> >> >> On Wednesday, August 2, 2023, Vibhatha Abeykoon <vibha...@gmail.com> >> wrote: >> >> Hi Winston, >> >> I am looking for a way to access the LogicalPlan object in Scala. Not >> sure if explain function would serve the purpose. >> >> On Wed, Aug 2, 2023 at 9:14 AM Winston Lai <weiruanl...@gmail.com> wrote: >> >> Hi Vibhatha, >> >> Have you tried pyspark.sql.DataFrame.explain — PySpark 3.4.1 >> documentation (apache.org) >> <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.explain.html#pyspark.sql.DataFrame.explain> >> before? >> I am not sure what infra that you have, you can try this first. If it >> doesn't work, you may share more info such as what platform you are running >> your Spark jobs on, what cloud servies you are using ... >> >> On Wednesday, August 2, 2023, Vibhatha Abeykoon <vibha...@gmail.com> >> wrote: >> >> Hello, >> >> I recently upgraded the Spark version to 3.4.1 and I have encountered a >> few issues. In my previous code, I was able to extract the logical plan >> using `df.queryExecution` (df: DataFrame and in Scala), but it seems like >> in the latest API it is not supported. Is there a way to extract the >> logical plan or optimized plan from a dataframe or dataset in Spark 3.4.1? >> >> Best, >> Vibhatha >> >> -- >> Vibhatha Abeykoon >> >> -- >> Vibhatha Abeykoon >> >> >> >> -- >> Ruifeng Zheng >> E-mail: zrfli...@gmail.com >> >> -- >> Vibhatha Abeykoon >> >> >> >> -- >> Ruifeng Zheng >> E-mail: zrfli...@gmail.com >> >> -- >> Vibhatha Abeykoon >> >