Re: How do you debug a code-generated aggregate?

2024-02-12 Thread Jack Goodson
I may be ignorant of other debugging methods in Spark but the best success
I've had is using smaller datasets (if runs take a long time) and adding
intermediate output steps. This is quite different from application
development in non-distributed systems where a debugger is trivial to
attach but I believe it's one of the trade offs on using a system like
Spark for data processing, most SQL engines suffer from the same issue. If
you do believe there is a bug in Spark using the explain function like
Herman mentioned helps as well as looking at the Spark plan in the Spark UI

On Tue, Feb 13, 2024 at 9:24 AM Nicholas Chammas 
wrote:

> OK, I figured it out. The details are in SPARK-47024
>  for anyone who’s
> interested.
>
> It turned out to be a floating point arithmetic “bug”. The main reason I
> was able to figure it out was because I’ve been investigating another,
> unrelated bug (a real bug) related to floats, so these weird float corner
> cases have been top of mind.
>
> If it weren't for that, I wonder how much progress I would have made.
> Though I could inspect the generated code, I couldn’t figure out how to get
> logging statements placed in the generated code to print somewhere I could
> see them.
>
> Depending on how often we find ourselves debugging aggregates like this,
> it would be really helpful if we added some way to trace the aggregation
> buffer.
>
> In any case, mystery solved. Thank you for the pointer!
>
>
> On Feb 12, 2024, at 8:39 AM, Herman van Hovell 
> wrote:
>
> There is no really easy way of getting the state of the aggregation
> buffer, unless you are willing to modify the code generation and sprinkle
> in some logging.
>
> What I would start with is dumping the generated code by calling
> explain('codegen') on the DataFrame. That helped me to find similar issues
> in most cases.
>
> HTH
>
> On Sun, Feb 11, 2024 at 11:26 PM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Consider this example:
>>
>> >>> from pyspark.sql.functions import sum>>> 
>> >>> spark.range(4).repartition(2).select(sum("id")).show()+---+|sum(id)|+---+|
>> >>>   6|+---+
>>
>>
>> I’m trying to understand how this works because I’m investigating a bug
>> in this kind of aggregate.
>>
>> I see that doProduceWithoutKeys
>> 
>>  and doConsumeWithoutKeys
>> 
>>  are
>> called, and I believe they are responsible for computing a declarative
>> aggregate like `sum`. But I’m not sure how I would debug the generated
>> code, or the inputs that drive what code gets generated.
>>
>> Say you were running the above example and it was producing an incorrect
>> result, and you knew the problem was somehow related to the sum. How would
>> you troubleshoot it to identify the root cause?
>>
>> Ideally, I would like some way to track how the aggregation buffer
>> mutates as the computation is executed, so I can see something roughly like:
>>
>> [0, 1, 2, 3]
>> [1, 5]
>> [6]
>>
>>
>> Is there some way to trace a declarative aggregate like this?
>>
>> Nick
>>
>>
>


Re: How do you debug a code-generated aggregate?

2024-02-12 Thread Nicholas Chammas
OK, I figured it out. The details are in SPARK-47024 
 for anyone who’s interested.

It turned out to be a floating point arithmetic “bug”. The main reason I was 
able to figure it out was because I’ve been investigating another, unrelated 
bug (a real bug) related to floats, so these weird float corner cases have been 
top of mind.

If it weren't for that, I wonder how much progress I would have made. Though I 
could inspect the generated code, I couldn’t figure out how to get logging 
statements placed in the generated code to print somewhere I could see them.

Depending on how often we find ourselves debugging aggregates like this, it 
would be really helpful if we added some way to trace the aggregation buffer.

In any case, mystery solved. Thank you for the pointer!


> On Feb 12, 2024, at 8:39 AM, Herman van Hovell  wrote:
> 
> There is no really easy way of getting the state of the aggregation buffer, 
> unless you are willing to modify the code generation and sprinkle in some 
> logging.
> 
> What I would start with is dumping the generated code by calling 
> explain('codegen') on the DataFrame. That helped me to find similar issues in 
> most cases.
> 
> HTH
> 
> On Sun, Feb 11, 2024 at 11:26 PM Nicholas Chammas  > wrote:
>> Consider this example:
>> >>> from pyspark.sql.functions import sum
>> >>> spark.range(4).repartition(2).select(sum("id")).show()
>> +---+
>> |sum(id)|
>> +---+
>> |  6|
>> +---+
>> 
>> I’m trying to understand how this works because I’m investigating a bug in 
>> this kind of aggregate.
>> 
>> I see that doProduceWithoutKeys 
>> 
>>  and doConsumeWithoutKeys 
>> 
>>  are called, and I believe they are responsible for computing a declarative 
>> aggregate like `sum`. But I’m not sure how I would debug the generated code, 
>> or the inputs that drive what code gets generated.
>> 
>> Say you were running the above example and it was producing an incorrect 
>> result, and you knew the problem was somehow related to the sum. How would 
>> you troubleshoot it to identify the root cause?
>> 
>> Ideally, I would like some way to track how the aggregation buffer mutates 
>> as the computation is executed, so I can see something roughly like:
>> [0, 1, 2, 3]
>> [1, 5]
>> [6]
>> 
>> Is there some way to trace a declarative aggregate like this?
>> 
>> Nick
>> 



Re: Extracting Input and Output Partitions in Spark

2024-02-12 Thread Aditya Sohoni
Sharing an example since a few people asked me off-list:

We have stored the partition details in the read/write nodes of the
physical plan.
So this can be accessed via the plan like plan.getInputPartitions or
plan.getOutputPartitions, which internally loops through the nodes in the
plan and collects the input and output partition details.

Could easily be extended to a dataframe method like df.getInputPartitions
or df.getOutputPartitions.

An example:
df = spark.sql("insert into table_b SELECT * from table_a where
datestr>"dd/mm/")
df.show()
inputPartitions = df.getInputPartitions
outputPartitions = df.getOutputPartitions

inputPartitions and outputPartitions now have the list of tables and the
partitions in those tables the query read from and wrote to, can be used to
power freshness alerts or used for any other statistics.

Want to know from the dev community, would a SPIP proposal be ideal here?

On Wed, Jan 31, 2024 at 11:45 AM Aditya Sohoni 
wrote:

> Hello Spark Devs!
>
>
> We are from Uber's Spark team.
>
>
> Our ETL jobs use Spark to read and write from Hive datasets stored in
> HDFS. The freshness of the partition written to depends on the freshness of
> the data in the input partition(s). We monitor this freshness score, so
> that partitions in our critical tables always have fresh data.
>
>
> We are looking for some code/helper function/utility etc built into the
> Spark engine, through which we can programmatically get the list of
> partitions read and written by an execution.
>
>
> We looked for this in the plan, and our initial code study did not
> pinpoint us to any such method. We have been dependent on indirect ways
> like audit logs of storage, HMS, etc. We find them difficult to use and
> scale.
>
>
> However, the spark code does contain the list of partitions read and
> written. The below files have the partition data for the given file format:
>
> 1. Input partitions from HiveTableScanExec.scala(Text format)
>
> 2. Input partitions from DataSourceScanExec.scala(Parquet/Hudi/Orc).
>
> 3. Output partitions from InsertIntoHiveTable.scala(Text format)
>
> 4. Output partitions from
> InsertIntoHadoopFsRelationCommand.scala(Parquet/Hudi/Orc).
>
>
> We did come up with some code that can help gather this info in a
> programmatically friendly way. We maintained this information in the plan.
> We wrapped the plan with some convenience classes and methods to extract
> the partition details.
>
>
> We felt that such a programmatic interface could be used for more purposes
> as well, like showing in SHS a new set of statistics that can aid in
> troubleshooting.
>
>
> I wanted to know from the Dev Community, is there already something that
> is/was implemented in Spark that can solve our requirement? If not, we
> would love to share how we have implemented this and contribute to the
> community.
>
>
> Regards,
>
> Aditya Sohoni
>


Re: How do you debug a code-generated aggregate?

2024-02-12 Thread Herman van Hovell
There is no really easy way of getting the state of the aggregation buffer,
unless you are willing to modify the code generation and sprinkle in some
logging.

What I would start with is dumping the generated code by calling
explain('codegen') on the DataFrame. That helped me to find similar issues
in most cases.

HTH

On Sun, Feb 11, 2024 at 11:26 PM Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Consider this example:
>
> >>> from pyspark.sql.functions import sum>>> 
> >>> spark.range(4).repartition(2).select(sum("id")).show()+---+|sum(id)|+---+|
> >>>   6|+---+
>
> I’m trying to understand how this works because I’m investigating a bug in
> this kind of aggregate.
>
> I see that doProduceWithoutKeys
> 
>  and doConsumeWithoutKeys
> 
>  are
> called, and I believe they are responsible for computing a declarative
> aggregate like `sum`. But I’m not sure how I would debug the generated
> code, or the inputs that drive what code gets generated.
>
> Say you were running the above example and it was producing an incorrect
> result, and you knew the problem was somehow related to the sum. How would
> you troubleshoot it to identify the root cause?
>
> Ideally, I would like some way to track how the aggregation buffer mutates
> as the computation is executed, so I can see something roughly like:
>
> [0, 1, 2, 3]
> [1, 5]
> [6]
>
> Is there some way to trace a declarative aggregate like this?
>
> Nick
>
>