Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread John Zhuge
Congratulations! Excellent work!

On Tue, Feb 13, 2024 at 8:04 PM Yufei Gu  wrote:

> Absolutely thrilled to see the project going open-source! Huge congrats to
> Chao and the entire team on this milestone!
>
> Yufei
>
>
> On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:
>
>> Hi all,
>>
>> We are very happy to announce that Project Comet, a plugin to
>> accelerate Spark query execution via leveraging DataFusion and Arrow,
>> has now been open sourced under the Apache Arrow umbrella. Please
>> check the project repo
>> https://github.com/apache/arrow-datafusion-comet for more details if
>> you are interested. We'd love to collaborate with people from the open
>> source community who share similar goals.
>>
>> Thanks,
>> Chao
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>

-- 
John Zhuge


Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread Yufei Gu
Absolutely thrilled to see the project going open-source! Huge congrats to
Chao and the entire team on this milestone!

Yufei


On Tue, Feb 13, 2024 at 12:43 PM Chao Sun  wrote:

> Hi all,
>
> We are very happy to announce that Project Comet, a plugin to
> accelerate Spark query execution via leveraging DataFusion and Arrow,
> has now been open sourced under the Apache Arrow umbrella. Please
> check the project repo
> https://github.com/apache/arrow-datafusion-comet for more details if
> you are interested. We'd love to collaborate with people from the open
> source community who share similar goals.
>
> Thanks,
> Chao
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: How do you debug a code-generated aggregate?

2024-02-13 Thread Mich Talebzadeh
Sure thanks for clarification.  I gather what you are alluding to is -- in
a distributed environment, when one does operations that involve shuffling
or repartitioning of data, the order in which this data is processed across
partitions is not guaranteed. So when repartitioning a dataframe, the data
is redistributed across partitions, and each partition may process its
portion of the data independently and that makes the debugging distributed
systems challenging.

I hope that makes sense.

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Feb 2024 at 21:25, Jack Goodson  wrote:

> Apologies if it wasn't clear, I was meaning the difficulty of debugging,
> not floating point precision :)
>
> On Wed, Feb 14, 2024 at 2:03 AM Mich Talebzadeh 
> wrote:
>
>> Hi Jack,
>>
>> "  most SQL engines suffer from the same issue... ""
>>
>> Sure. This behavior is not a bug, but rather a consequence of the
>> limitations of floating-point precision. The numbers involved in the
>> example (see SPIP [SPARK-47024] Sum of floats/doubles may be incorrect
>> depending on partitioning - ASF JIRA (apache.org)
>>  exceed the precision
>> of the double-precision floating-point representation used by default in
>> Spark and others Interesting to have a look and test the code
>>
>> This is the code
>>
>> SUM_EXAMPLE = [
>> (1.0,), (0.0,), (1.0,), (9007199254740992.0,), ] spark = (
>> SparkSession.builder .config("spark.log.level", "ERROR") .getOrCreate() )
>> def compare_sums(data, num_partitions): df = spark.createDataFrame(data,
>> "val double").coalesce(1) result1 = df.agg(sum(col("val"))).collect()[0][0]
>> df = spark.createDataFrame(data, "val double").repartition(num_partitions) 
>> *result2
>> = df.agg(sum(col("val"))).collect()[0][0]* assert result1 == result2,
>> f"{result1}, {result2}" if __name__ == "__main__":
>> print(compare_sums(SUM_EXAMPLE, 2))
>> In Python, floating-point numbers are implemented using the IEEE 754
>> standard,
>> which
>> has a limited precision. When one performs operations with very large
>> numbers or numbers with many decimal places, one may encounter precision
>> errors.
>>
>> print(compare_sums(SUM_EXAMPLE, 2)) File "issue01.py", line 23, in
>> compare_sums assert result1 == result2, f"{result1}, {result2}"
>> AssertionError: 9007199254740994.0, 9007199254740992.0
>> In the aforementioned case, the result of the aggregation (sum) is
>> affected by the precision limits of floating-point representation. The
>> difference between 9007199254740994.0, 9007199254740992.0. is within the
>> expected precision limitations of double-precision floating-point numbers.
>>
>> The likely cause in this scenario in this example
>>
>> When one performs an aggregate operation like sum on a DataFrame, the
>> operation may be affected by the order of the data.and the case here, the
>> order of data can be influenced by the number of partitions in
>> Spark..result2 above creates a new DataFrame df with the same data but
>> explicitly repartition it into two partitions
>> (repartition(num_partitions)). Repartitioning can shuffle the data
>> across partitions, introducing a different order for the subsequent
>> aggregation. The sum operation is then performed on the data in a
>> different order, leading to a slightly different result from result1
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Dad | Technologist | Solutions Architect | Engineer
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Tue, 13 Feb 2024 at 03:06, Jack Goodson 
>> wrote:
>>
>>> I may be ignorant of other debugging methods in Spark but the best
>>> success I've had is using smaller datasets (if runs take a long time) and
>>> adding intermediate output steps. This is quite different from application
>>> development in non-distributed systems where a debugger is trivial to

Re: How do you debug a code-generated aggregate?

2024-02-13 Thread Jack Goodson
Apologies if it wasn't clear, I was meaning the difficulty of debugging,
not floating point precision :)

On Wed, Feb 14, 2024 at 2:03 AM Mich Talebzadeh 
wrote:

> Hi Jack,
>
> "  most SQL engines suffer from the same issue... ""
>
> Sure. This behavior is not a bug, but rather a consequence of the
> limitations of floating-point precision. The numbers involved in the
> example (see SPIP [SPARK-47024] Sum of floats/doubles may be incorrect
> depending on partitioning - ASF JIRA (apache.org)
>  exceed the precision
> of the double-precision floating-point representation used by default in
> Spark and others Interesting to have a look and test the code
>
> This is the code
>
> SUM_EXAMPLE = [
> (1.0,), (0.0,), (1.0,), (9007199254740992.0,), ] spark = (
> SparkSession.builder .config("spark.log.level", "ERROR") .getOrCreate() )
> def compare_sums(data, num_partitions): df = spark.createDataFrame(data,
> "val double").coalesce(1) result1 = df.agg(sum(col("val"))).collect()[0][0]
> df = spark.createDataFrame(data, "val double").repartition(num_partitions) 
> *result2
> = df.agg(sum(col("val"))).collect()[0][0]* assert result1 == result2,
> f"{result1}, {result2}" if __name__ == "__main__":
> print(compare_sums(SUM_EXAMPLE, 2))
> In Python, floating-point numbers are implemented using the IEEE 754
> standard,
> which
> has a limited precision. When one performs operations with very large
> numbers or numbers with many decimal places, one may encounter precision
> errors.
>
> print(compare_sums(SUM_EXAMPLE, 2)) File "issue01.py", line 23, in
> compare_sums assert result1 == result2, f"{result1}, {result2}"
> AssertionError: 9007199254740994.0, 9007199254740992.0
> In the aforementioned case, the result of the aggregation (sum) is
> affected by the precision limits of floating-point representation. The
> difference between 9007199254740994.0, 9007199254740992.0. is within the
> expected precision limitations of double-precision floating-point numbers.
>
> The likely cause in this scenario in this example
>
> When one performs an aggregate operation like sum on a DataFrame, the
> operation may be affected by the order of the data.and the case here, the
> order of data can be influenced by the number of partitions in
> Spark..result2 above creates a new DataFrame df with the same data but
> explicitly repartition it into two partitions
> (repartition(num_partitions)). Repartitioning can shuffle the data across
> partitions, introducing a different order for the subsequent aggregation.
> The sum operation is then performed on the data in a different order,
> leading to a slightly different result from result1
>
> HTH
>
> Mich Talebzadeh,
> Dad | Technologist | Solutions Architect | Engineer
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Tue, 13 Feb 2024 at 03:06, Jack Goodson  wrote:
>
>> I may be ignorant of other debugging methods in Spark but the best
>> success I've had is using smaller datasets (if runs take a long time) and
>> adding intermediate output steps. This is quite different from application
>> development in non-distributed systems where a debugger is trivial to
>> attach but I believe it's one of the trade offs on using a system like
>> Spark for data processing, most SQL engines suffer from the same issue. If
>> you do believe there is a bug in Spark using the explain function like
>> Herman mentioned helps as well as looking at the Spark plan in the Spark UI
>>
>> On Tue, Feb 13, 2024 at 9:24 AM Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> OK, I figured it out. The details are in SPARK-47024
>>>  for anyone who’s
>>> interested.
>>>
>>> It turned out to be a floating point arithmetic “bug”. The main reason I
>>> was able to figure it out was because I’ve been investigating another,
>>> unrelated bug (a real bug) related to floats, so these weird float corner
>>> cases have been top of mind.
>>>
>>> If it weren't for that, I wonder how much progress I would have made.
>>> Though I could inspect the generated code, I couldn’t figure out how to get
>>> logging statements placed in the generated code to print somewhere I could
>>> see them.
>>>
>>> Depending on how often we find ourselves debugging aggregates like this,
>>> it would be really helpful if we added some way to trace 

Re: Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread Holden Karau
This looks really cool :) Out of interest what are the differences in the
approach between this and Glutten?

On Tue, Feb 13, 2024 at 12:42 PM Chao Sun  wrote:

> Hi all,
>
> We are very happy to announce that Project Comet, a plugin to
> accelerate Spark query execution via leveraging DataFusion and Arrow,
> has now been open sourced under the Apache Arrow umbrella. Please
> check the project repo
> https://github.com/apache/arrow-datafusion-comet for more details if
> you are interested. We'd love to collaborate with people from the open
> source community who share similar goals.
>
> Thanks,
> Chao
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Introducing Comet, a plugin to accelerate Spark execution via DataFusion and Arrow

2024-02-13 Thread Chao Sun
Hi all,

We are very happy to announce that Project Comet, a plugin to
accelerate Spark query execution via leveraging DataFusion and Arrow,
has now been open sourced under the Apache Arrow umbrella. Please
check the project repo
https://github.com/apache/arrow-datafusion-comet for more details if
you are interested. We'd love to collaborate with people from the open
source community who share similar goals.

Thanks,
Chao

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Heads-up: Update on Spark 3.5.1 RC

2024-02-13 Thread Dongjoon Hyun
Thank you for the update, Jungtaek.

Dongjoon.

On Tue, Feb 13, 2024 at 7:29 AM Jungtaek Lim 
wrote:

> Hi,
>
> Just a head-up since I didn't give an update for a week after the last
> update from the discussion thread.
>
> I've been following the automated release process and encountered several
> issues. Maybe I will file JIRA tickets and follow PRs.
>
> Issues I figured out so far are 1) python library version issue in the
> release docker image, 2) doc build failure in pyspark ml for Spark connect.
> I'm deferring to submit fixes till I see dry-run to succeed.
>
> Btw, I optimistically ran the process without a dry-run as GA has been
> paased (my bad), and the tag for RC1 being created was done before I saw
> issues. Maybe I'll need to start with RC2 after things are sorted out and
> necessary fixes are landed to branch-3.5.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
>


Re: Extracting Input and Output Partitions in Spark

2024-02-13 Thread Daniel Saha
This would be helpful for a few use cases. For context my team works in
security space, and customers access data through a wrapper around spark
sql connected to hive metastore.

1. When snapshot (non-partitioned) tables are queried, it’s not clear when
the underlying snapshot was last updated. having this information could
surface the staleness to customers per query.

2. Hive metastore does not throw an error when non-existent partitions are
queried e.g. year=2025. This is problematic in the security space where a
customer might confuse lack of results with “no hits” (false negative,
vacuous truth). While not necessarily about freshness, this feature would
easily allow to quickly fail the query due to 0 input partitions

3. Knowing the input partitions would allow to route queries to different
infrastructure based how much data is scanned and the freshness of the
data. For example, may want to prioritize customers scanning more recent
data (time sensitive) rather than those scanning 1+ year old data
(backfills). Having this info at the plan level is more ideal than data
frame method - so the logic could run before/outside the spark cluster in a
Scala app and just bring in catalyst dependencies + HMS connection without
the rest of Spark dependencies.

Daniel Saha


On Mon, Feb 12, 2024 at 10:45 AM Aditya Sohoni
 wrote:

> Sharing an example since a few people asked me off-list:
>
> We have stored the partition details in the read/write nodes of the
> physical plan.
> So this can be accessed via the plan like plan.getInputPartitions or
> plan.getOutputPartitions, which internally loops through the nodes in the
> plan and collects the input and output partition details.
>
> Could easily be extended to a dataframe method like df.getInputPartitions
> or df.getOutputPartitions.
>
> An example:
> df = spark.sql("insert into table_b SELECT * from table_a where
> datestr>"dd/mm/")
> df.show()
> inputPartitions = df.getInputPartitions
> outputPartitions = df.getOutputPartitions
>
> inputPartitions and outputPartitions now have the list of tables and the
> partitions in those tables the query read from and wrote to, can be used to
> power freshness alerts or used for any other statistics.
>
> Want to know from the dev community, would a SPIP proposal be ideal here?
>
> On Wed, Jan 31, 2024 at 11:45 AM Aditya Sohoni 
> wrote:
>
>> Hello Spark Devs!
>>
>>
>> We are from Uber's Spark team.
>>
>>
>> Our ETL jobs use Spark to read and write from Hive datasets stored in
>> HDFS. The freshness of the partition written to depends on the freshness of
>> the data in the input partition(s). We monitor this freshness score, so
>> that partitions in our critical tables always have fresh data.
>>
>>
>> We are looking for some code/helper function/utility etc built into the
>> Spark engine, through which we can programmatically get the list of
>> partitions read and written by an execution.
>>
>>
>> We looked for this in the plan, and our initial code study did not
>> pinpoint us to any such method. We have been dependent on indirect ways
>> like audit logs of storage, HMS, etc. We find them difficult to use and
>> scale.
>>
>>
>> However, the spark code does contain the list of partitions read and
>> written. The below files have the partition data for the given file format:
>>
>> 1. Input partitions from HiveTableScanExec.scala(Text format)
>>
>> 2. Input partitions from DataSourceScanExec.scala(Parquet/Hudi/Orc).
>>
>> 3. Output partitions from InsertIntoHiveTable.scala(Text format)
>>
>> 4. Output partitions from
>> InsertIntoHadoopFsRelationCommand.scala(Parquet/Hudi/Orc).
>>
>>
>> We did come up with some code that can help gather this info in a
>> programmatically friendly way. We maintained this information in the plan.
>> We wrapped the plan with some convenience classes and methods to extract
>> the partition details.
>>
>>
>> We felt that such a programmatic interface could be used for more
>> purposes as well, like showing in SHS a new set of statistics that can aid
>> in troubleshooting.
>>
>>
>> I wanted to know from the Dev Community, is there already something that
>> is/was implemented in Spark that can solve our requirement? If not, we
>> would love to share how we have implemented this and contribute to the
>> community.
>>
>>
>> Regards,
>>
>> Aditya Sohoni
>>
>


Heads-up: Update on Spark 3.5.1 RC

2024-02-13 Thread Jungtaek Lim
Hi,

Just a head-up since I didn't give an update for a week after the last
update from the discussion thread.

I've been following the automated release process and encountered several
issues. Maybe I will file JIRA tickets and follow PRs.

Issues I figured out so far are 1) python library version issue in the
release docker image, 2) doc build failure in pyspark ml for Spark connect.
I'm deferring to submit fixes till I see dry-run to succeed.

Btw, I optimistically ran the process without a dry-run as GA has been
paased (my bad), and the tag for RC1 being created was done before I saw
issues. Maybe I'll need to start with RC2 after things are sorted out and
necessary fixes are landed to branch-3.5.

Thanks,
Jungtaek Lim (HeartSaVioR)


Re: How do you debug a code-generated aggregate?

2024-02-13 Thread Mich Talebzadeh
Hi Jack,

"  most SQL engines suffer from the same issue... ""

Sure. This behavior is not a bug, but rather a consequence of the
limitations of floating-point precision. The numbers involved in the
example (see SPIP [SPARK-47024] Sum of floats/doubles may be incorrect
depending on partitioning - ASF JIRA (apache.org)
 exceed the precision of
the double-precision floating-point representation used by default in Spark
and others Interesting to have a look and test the code

This is the code

SUM_EXAMPLE = [
(1.0,), (0.0,), (1.0,), (9007199254740992.0,), ] spark = (
SparkSession.builder .config("spark.log.level", "ERROR") .getOrCreate() )
def compare_sums(data, num_partitions): df = spark.createDataFrame(data,
"val double").coalesce(1) result1 = df.agg(sum(col("val"))).collect()[0][0]
df = spark.createDataFrame(data, "val
double").repartition(num_partitions) *result2
= df.agg(sum(col("val"))).collect()[0][0]* assert result1 == result2,
f"{result1}, {result2}" if __name__ == "__main__":
print(compare_sums(SUM_EXAMPLE, 2))
In Python, floating-point numbers are implemented using the IEEE 754
standard,
which
has a limited precision. When one performs operations with very large
numbers or numbers with many decimal places, one may encounter precision
errors.

print(compare_sums(SUM_EXAMPLE, 2)) File "issue01.py", line 23, in
compare_sums assert result1 == result2, f"{result1}, {result2}"
AssertionError: 9007199254740994.0, 9007199254740992.0
In the aforementioned case, the result of the aggregation (sum) is affected
by the precision limits of floating-point representation. The difference
between 9007199254740994.0, 9007199254740992.0. is within the expected
precision limitations of double-precision floating-point numbers.

The likely cause in this scenario in this example

When one performs an aggregate operation like sum on a DataFrame, the
operation may be affected by the order of the data.and the case here, the
order of data can be influenced by the number of partitions in
Spark..result2 above creates a new DataFrame df with the same data but
explicitly repartition it into two partitions
(repartition(num_partitions)). Repartitioning can shuffle the data across
partitions, introducing a different order for the subsequent aggregation.
The sum operation is then performed on the data in a different order,
leading to a slightly different result from result1

HTH

Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 13 Feb 2024 at 03:06, Jack Goodson  wrote:

> I may be ignorant of other debugging methods in Spark but the best success
> I've had is using smaller datasets (if runs take a long time) and adding
> intermediate output steps. This is quite different from application
> development in non-distributed systems where a debugger is trivial to
> attach but I believe it's one of the trade offs on using a system like
> Spark for data processing, most SQL engines suffer from the same issue. If
> you do believe there is a bug in Spark using the explain function like
> Herman mentioned helps as well as looking at the Spark plan in the Spark UI
>
> On Tue, Feb 13, 2024 at 9:24 AM Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> OK, I figured it out. The details are in SPARK-47024
>>  for anyone who’s
>> interested.
>>
>> It turned out to be a floating point arithmetic “bug”. The main reason I
>> was able to figure it out was because I’ve been investigating another,
>> unrelated bug (a real bug) related to floats, so these weird float corner
>> cases have been top of mind.
>>
>> If it weren't for that, I wonder how much progress I would have made.
>> Though I could inspect the generated code, I couldn’t figure out how to get
>> logging statements placed in the generated code to print somewhere I could
>> see them.
>>
>> Depending on how often we find ourselves debugging aggregates like this,
>> it would be really helpful if we added some way to trace the aggregation
>> buffer.
>>
>> In any case, mystery solved. Thank you for the pointer!
>>
>>
>> On Feb 12, 2024, at 8:39 AM, Herman van Hovell 
>> wrote:
>>
>> There is no really easy way of getting the state of the aggregation
>> buffer, unless you are willing to modify the code generation and sprinkle
>> in some logging.