Re: [DISCUSS] SPARK-44444: Use ANSI SQL mode by default

2024-04-11 Thread Gengliang Wang
+1, enabling Spark's ANSI SQL mode in version 4.0 will significantly
enhance data quality and integrity. I fully support this initiative.

> In other words, the current Spark ANSI SQL implementation becomes the
first implementation for Spark SQL users to face at first while providing
`spark.sql.ansi.enabled=false` in the same way without losing any
capability.`spark.sql.ansi.enabled=false` in the same way without losing
any capability.

BTW, the try_*

functions and SQL Error Attribution Framework
 will also be beneficial
in migrating to ANSI SQL mode.


Gengliang


On Thu, Apr 11, 2024 at 7:56 PM Dongjoon Hyun 
wrote:

> Hi, All.
>
> Thanks to you, we've been achieving many things and have on-going SPIPs.
> I believe it's time to scope Apache Spark 4.0.0 (SPARK-44111) more narrowly
> by asking your opinions about Apache Spark's ANSI SQL mode.
>
> https://issues.apache.org/jira/browse/SPARK-44111
> Prepare Apache Spark 4.0.0
>
> SPARK-4 was proposed last year (on 15/Jul/23) as the one of desirable
> items for 4.0.0 because it's a big behavior.
>
> https://issues.apache.org/jira/browse/SPARK-4
> Use ANSI SQL mode by default
>
> Historically, spark.sql.ansi.enabled was added at Apache Spark 3.0.0 and
> has
> been aiming to provide a better Spark SQL compatibility in a standard way.
> We also have a daily CI to protect the behavior too.
>
> https://github.com/apache/spark/actions/workflows/build_ansi.yml
>
> However, it's still behind the configuration with several known issues,
> e.g.,
>
> SPARK-41794 Reenable ANSI mode in test_connect_column
> SPARK-41547 Reenable ANSI mode in test_connect_functions
> SPARK-46374 Array Indexing is 1-based via ANSI SQL Standard
>
> To be clear, we know that many DBMSes have their own implementations of
> SQL standard and not the same. Like them, SPARK-4 aims to enable
> only the existing Spark's configuration, `spark.sql.ansi.enabled=true`.
> There is nothing more than that.
>
> In other words, the current Spark ANSI SQL implementation becomes the first
> implementation for Spark SQL users to face at first while providing
> `spark.sql.ansi.enabled=false` in the same way without losing any
> capability.
>
> If we don't want this change for some reasons, we can simply exclude
> SPARK-4 from SPARK-44111 as a part of Apache Spark 4.0.0 preparation.
> It's time just to make a go/no-go decision for this item for the global
> optimization
> for Apache Spark 4.0.0 release. After 4.0.0, it's unlikely for us to aim
> for this again for the next four years until 2028.
>
> WDYT?
>
> Bests,
> Dongjoon
>


Re: [PySpark]: DataFrameWriterV2.overwrite fails with spark connect

2024-04-11 Thread Ruifeng Zheng
Toki Takahashi,

Thanks for reporting this, I created
https://issues.apache.org/jira/browse/SPARK-47828 to track this bug.
I will take a look.

On Thu, Apr 11, 2024 at 10:11 PM Toki Takahashi 
wrote:

> Hi Community,
>
> I get the following error when using Spark Connect in PySpark 3.5.1
> and writing with DataFrameWriterV2.overwrite.
>
> ```
> > df.writeTo('db.table').overwrite(F.col('id')==F.lit(1))
> ...
> SparkConnectGrpcException:
> (org.apache.spark.sql.connect.common.InvalidPlanInput) Expression with
> ID: 0 is not supported
> ```
>
> I believe this is caused by the following code:
>
> https://github.com/apache/spark/blob/6e371e1df50e35d807065015525772c3c02a5995/python/pyspark/sql/connect/plan.py#L1760-L1763
>
> Is there a JIRA issue or PR regarding this error?
> If not, create one.
>
> Thanks,
> Toki Takahashi
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
Ruifeng Zheng
E-mail: zrfli...@gmail.com


[VOTE] Add new `Versions` in Apache Spark JIRA for Versioning of Spark Operator

2024-04-11 Thread L. C. Hsieh
Hi all,

Thanks for all discussions in the thread of "Versioning of Spark
Operator": https://lists.apache.org/thread/zhc7nb2sxm8jjxdppq8qjcmlf4rcsthh

I would like to create this vote to get the consensus for versioning
of the Spark Kubernetes Operator.

The proposal is to use an independent versioning for the Spark
Kubernetes Operator.

Please vote on adding new `Versions` in Apache Spark JIRA which can be
used for places like "Fix Version/s" in the JIRA tickets of the
operator.

The new `Versions` will be `kubernetes-operator-` prefix, for example
`kubernetes-operator-0.1.0`.

The vote is open until April 15th 1AM (PST) and passes if a majority
+1 PMC votes are cast, with a minimum of 3 +1 votes.

[ ] +1 Adding the new `Versions` for Spark Kubernetes Operator in
Apache Spark JIRA
[ ] -1 Do not add the new `Versions` because ...

Thank you.


Note that this is not a SPIP vote and also not a release vote. I don't
find similar votes in previous threads. This is made similarly like a
SPIP or a release vote. So I think it should be okay. Please correct
me if this vote format is not good for you.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[DISCUSS] SPARK-44444: Use ANSI SQL mode by default

2024-04-11 Thread Dongjoon Hyun
Hi, All.

Thanks to you, we've been achieving many things and have on-going SPIPs.
I believe it's time to scope Apache Spark 4.0.0 (SPARK-44111) more narrowly
by asking your opinions about Apache Spark's ANSI SQL mode.

https://issues.apache.org/jira/browse/SPARK-44111
Prepare Apache Spark 4.0.0

SPARK-4 was proposed last year (on 15/Jul/23) as the one of desirable
items for 4.0.0 because it's a big behavior.

https://issues.apache.org/jira/browse/SPARK-4
Use ANSI SQL mode by default

Historically, spark.sql.ansi.enabled was added at Apache Spark 3.0.0 and has
been aiming to provide a better Spark SQL compatibility in a standard way.
We also have a daily CI to protect the behavior too.

https://github.com/apache/spark/actions/workflows/build_ansi.yml

However, it's still behind the configuration with several known issues,
e.g.,

SPARK-41794 Reenable ANSI mode in test_connect_column
SPARK-41547 Reenable ANSI mode in test_connect_functions
SPARK-46374 Array Indexing is 1-based via ANSI SQL Standard

To be clear, we know that many DBMSes have their own implementations of
SQL standard and not the same. Like them, SPARK-4 aims to enable
only the existing Spark's configuration, `spark.sql.ansi.enabled=true`.
There is nothing more than that.

In other words, the current Spark ANSI SQL implementation becomes the first
implementation for Spark SQL users to face at first while providing
`spark.sql.ansi.enabled=false` in the same way without losing any
capability.

If we don't want this change for some reasons, we can simply exclude
SPARK-4 from SPARK-44111 as a part of Apache Spark 4.0.0 preparation.
It's time just to make a go/no-go decision for this item for the global
optimization
for Apache Spark 4.0.0 release. After 4.0.0, it's unlikely for us to aim
for this again for the next four years until 2028.

WDYT?

Bests,
Dongjoon


Re: Vote on Dynamic resource allocation for structured streaming [SPARK-24815]

2024-04-11 Thread Jungtaek Lim
I'm still having a hard time reviewing this. I have been handling a bunch
of context right now, and the change is non-trivial to review in parallel.
I see people were OK with the algorithm in high-level, but from a code
perspective it's uneasy to understand without knowledge of DRA. It would
take me some time. I'll see whether I can get help from other folks.

That said, it'd be nice to make iterative changes. Let's go with the first
PR with very basic functionality and ensure we document very well so that
users would be able to use this. Also we have to document the limitations
with enough emphasis - likewise I said, having multiple queries in the same
driver (cluster) is a common case and we found the proposal does not
consider the case. Let's make a dedicated effort on the documentation.

On Sun, Apr 7, 2024 at 8:41 AM Pavan Kotikalapudi 
wrote:

> Hi Jungtaek,
>
> Status on current SPARK-24815
> :
> Thomas Graves is reviewing the draft PR
> . I need to add documentation
> about the configs and usage details, I am planning to do that this week.
> He did mention that it would be great if somebody with experience in
> structured streaming would take a look at the algorithm. Will you be able
> to review it?
>
> Another point I wanted to discuss is, as you might have already seen in
> the design doc
> 
>  we
> use traditional DRA configs
> spark.dynamicAllocation.enabled,
> spark.dynamicAllocation.schedulerBacklogTimeout,
> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout,
> spark.dynamicAllocation.executorIdleTimeout,
> spark.dynamicAllocation.cachedExecutorIdleTimeout,
>
> and few new configs
>
> spark.dynamicAllocation.streaming.enabled,
> spark.dynamicAllocation.streaming.executorDeallocationRatio,
> spark.dynamicAllocation.streaming.executorDeallocationTimeout.
>
> to make the DRA work for structured streaming.
>
> While in the design doc I did mention that we have to calculate  and set
> scale out/back thresholds based on the trigger interval
> .
> We (internally in the company) do have helper functions to auto-generate
> the above configs based on trigger interval and the threshold configs (we
> also got similar feedback in reviews
> 
> ).
> Here are such configs
>
>   # required - should be greater than 3 seconds as that gives enough
> seconds for scaleOut and scaleBack thresholds to work with.
>   "spark.sql.streaming.triggerInterval.seconds": 
>   # optional - value should be between 0 and 1 and greater than
> scaleBackThreshold : default is 0.9
>   "spark.dynamicAllocation.streaming.scaleOutThreshold": 
>   # optional - value should be between 0 and 1 and less than
> scaleOutThreshold : default is 0.5
>   "spark.dynamicAllocation.streaming.scaleBackThreshold": 
>
> The above configs helps us to generate the below configs for app with
> different trigger intervals ( or if they change them for some reason)
>
> spark.dynamicAllocation.schedulerBacklogTimeout,
> spark.dynamicAllocation.sustainedSchedulerBacklogTimeout,
> spark.dynamicAllocation.executorIdleTimeout,
> spark.dynamicAllocation.cachedExecutorIdleTimeout.
>
> While our additional configs have its own limitations. I would like to get
> some feedback if adding such kinds of configs to automate
> the process of calculating the thresholds and their respective configs
> makes sense?
>
> Thank you,
>
> Pavan
>
> On Thu, Mar 28, 2024 at 3:38 PM Pavan Kotikalapudi <
> pkotikalap...@twilio.com> wrote:
>
>> Hi Jungtaek,
>>
>> Sorry for the late reply.
>>
>> I understand the concerns towards finding PMC members, I had similar
>> concerns in the past. Do you think we have something to improve in the SPIP
>> (certain areas) so that it would get traction from PMC members? Or this
>> SPIP might not be a priority to the PMC right now?
>>
>> I agree this change is small enough that it might not be tagged as an
>> SPIP. I started with the template SPIP questions so that it would be easier
>> to understand the limitations of the current system, new solution, how it
>> works, how to use it, limitations etcAs you might have already
>> noticed in the PR, This change is turned off by default, will only work if
>> `spark.dynamicAllocation.streaming.enabled` is true.
>>
>> Regarding the concerns about expertise in DRA,  I will find some core
>> contributors of this module/DRA and tag them to this email with details,
>> Mich has also highlighted the same in the past. Once we get approval from
>> them we can further discuss and enhance this to make the user experience
>> better.
>>
>> Thank you,
>>
>> Pavan
>>
>>
>> On Tue, Mar 

Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
I think this answers your question about what to do if you need more space
on nodes.

https://spark.apache.org/docs/latest/running-on-kubernetes.html#local-storage

Local Storage


Spark supports using volumes to spill data during shuffles and other
operations. To use a volume as local storage, the volume’s name should
starts with spark-local-dir-, for example:

--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.path=
--conf 
spark.kubernetes.driver.volumes.[VolumeType].spark-local-dir-[VolumeName].mount.readOnly=false

Specifically, you can use persistent volume claims if the jobs require
large shuffle and sorting operations in executors.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=gp
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=500Gi
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false

To enable shuffle data recovery feature via the built-in
KubernetesLocalDiskShuffleDataIO plugin, we need to have the followings.
You may want to enable
spark.kubernetes.driver.waitToReusePersistentVolumeClaim additionally.

spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data/spark-x/executor-x
spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.KubernetesLocalDiskShuffleDataIO

If no volume is set as local storage, Spark uses temporary scratch space to
spill data to disk during shuffles and other operations. When using
Kubernetes as the resource manager the pods will be created with an emptyDir
 volume
mounted for each directory listed in spark.local.dir or the environment
variable SPARK_LOCAL_DIRS . If no directories are explicitly specified then
a default directory is created and configured appropriately.

emptyDir volumes use the ephemeral storage feature of Kubernetes and do not
persist beyond the life of the pod.

tor. 11. apr. 2024 kl. 10:29 skrev Bjørn Jørgensen :

> " In the end for my usecase I started using pvcs and pvc aware scheduling
> along with decommissioning. So far performance is good with this choice."
> How did you do this?
>
>
> tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :
>
>> Hi Everyone,
>>
>> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
>> had also explored AWS FSX lustre in few of my production jobs which has
>> ~20TB of shuffle operations with 200-300 executors. What I have observed is
>> S3 and fax behaviour was fine during the write phase, however I faced iops
>> throttling during the read phase(read taking forever to complete). I think
>> this might be contributed by the heavy use of shuffle index file (I didn't
>> perform any extensive research on this), so I believe the shuffle manager
>> logic have to be intelligent enough to reduce the fetching of files from
>> object store. In the end for my usecase I started using pvcs and pvc aware
>> scheduling along with decommissioning. So far performance is good with this
>> choice.
>>
>> Thank you
>>
>> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
>> wrote:
>>
>>> Hi,
>>>
>>> First thanks everyone for their contributions
>>>
>>> I was going to reply to @Enrico Minack   but
>>> noticed additional info. As I understand for example,  Apache Uniffle is an
>>> incubating project aimed at providing a pluggable shuffle service for
>>> Spark. So basically, all these "external shuffle services" have in common
>>> is to offload shuffle data management to external services, thus reducing
>>> the memory and CPU overhead on Spark executors. That is great.  While
>>> Uniffle and others enhance shuffle performance and scalability, it would be
>>> great to integrate them with Spark UI. This may require additional
>>> development efforts. I suppose  the interest would be to have these
>>> external matrices incorporated into Spark with one look and feel. This may
>>> require customizing the UI to fetch and display metrics or statistics from
>>> the external shuffle services. Has any project done this?
>>>
>>> Thanks
>>>
>>> Mich Talebzadeh,
>>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>>> London
>>> United Kingdom
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* The information provided is correct to the best of my
>>> knowledge but of course cannot be guaranteed . It is essential to note
>>> that, as with any advice, quote "one test result is worth one-thousand
>>> expert opinions (Werner
>>> 

[PySpark]: DataFrameWriterV2.overwrite fails with spark connect

2024-04-11 Thread Toki Takahashi
Hi Community,

I get the following error when using Spark Connect in PySpark 3.5.1
and writing with DataFrameWriterV2.overwrite.

```
> df.writeTo('db.table').overwrite(F.col('id')==F.lit(1))
...
SparkConnectGrpcException:
(org.apache.spark.sql.connect.common.InvalidPlanInput) Expression with
ID: 0 is not supported
```

I believe this is caused by the following code:
https://github.com/apache/spark/blob/6e371e1df50e35d807065015525772c3c02a5995/python/pyspark/sql/connect/plan.py#L1760-L1763

Is there a JIRA issue or PR regarding this error?
If not, create one.

Thanks,
Toki Takahashi

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [External] Re: Versioning of Spark Operator

2024-04-11 Thread Ofir Manor
A related question - what is the expected release cadence? At least for the 
next 12-18 months?
Since this is a new subproject, I am personally hoping it would have a faster 
cadence at first, maybe one a month or once every couple of months... If so, 
that would affect versioning.
Also, if it uses semantic versioning, since it is early for the subproject it 
might have a few releases with breaking changes until its own API, defaults, 
behavior becomes stable, so again, having its own versioning might help.
Just my two cents,
   Ofir

From: L. C. Hsieh 
Sent: Wednesday, April 10, 2024 6:14 PM
To: Dongjoon Hyun 
Cc: dev@spark.apache.org 
Subject: [External] Re: Versioning of Spark Operator

This approach makes sense to me.

If Spark K8s operator is aligned with Spark versions, for example, it
uses 4.0.0 now.
Because these JIRA tickets are not actually targeting Spark 4.0.0, it
will cause confusion and more questions, like when we are going to cut
Spark release,
should we include Spark operator JIRAs in the release note, etc.

So I think an independent version number for Spark K8s operator would
be a better option.

If there are no more options or comments, I will create a vote later
to create new "Versions" in Apache Spark JIRA.

Thank you all.

On Wed, Apr 10, 2024 at 12:20 AM Dongjoon Hyun  wrote:
>
> Ya, that would work.
>
> Inevitably, I looked at Apache Flink K8s Operator's JIRA and GitHub repo.
>
> It looks reasonable to me.
>
> Although they share the same JIRA, they choose different patterns per place.
>
> 1. In POM file and Maven Artifact, independent version number.
> 1.8.0
>
> 2. Tag is also based on the independent version number
> https://github.com/apache/flink-kubernetes-operator/tags
> - release-1.8.0
> - release-1.7.0
>
> 3. JIRA Fixed Version is `kubernetes-operator-` prefix.
> https://issues.apache.org/jira/browse/FLINK-34957
> > Fix Version/s: kubernetes-operator-1.9.0
>
> Maybe, we can borrow this pattern.
>
> I guess we need a vote for any further decision because we need to create new 
> `Versions` in Apache Spark JIRA.
>
> Dongjoon.
>

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: External Spark shuffle service for k8s

2024-04-11 Thread Bjørn Jørgensen
" In the end for my usecase I started using pvcs and pvc aware scheduling
along with decommissioning. So far performance is good with this choice."
How did you do this?


tor. 11. apr. 2024 kl. 04:13 skrev Arun Ravi :

> Hi Everyone,
>
> I had to explored IBM's and AWS's S3 shuffle plugins (some time back), I
> had also explored AWS FSX lustre in few of my production jobs which has
> ~20TB of shuffle operations with 200-300 executors. What I have observed is
> S3 and fax behaviour was fine during the write phase, however I faced iops
> throttling during the read phase(read taking forever to complete). I think
> this might be contributed by the heavy use of shuffle index file (I didn't
> perform any extensive research on this), so I believe the shuffle manager
> logic have to be intelligent enough to reduce the fetching of files from
> object store. In the end for my usecase I started using pvcs and pvc aware
> scheduling along with decommissioning. So far performance is good with this
> choice.
>
> Thank you
>
> On Tue, 9 Apr 2024, 15:17 Mich Talebzadeh, 
> wrote:
>
>> Hi,
>>
>> First thanks everyone for their contributions
>>
>> I was going to reply to @Enrico Minack   but
>> noticed additional info. As I understand for example,  Apache Uniffle is an
>> incubating project aimed at providing a pluggable shuffle service for
>> Spark. So basically, all these "external shuffle services" have in common
>> is to offload shuffle data management to external services, thus reducing
>> the memory and CPU overhead on Spark executors. That is great.  While
>> Uniffle and others enhance shuffle performance and scalability, it would be
>> great to integrate them with Spark UI. This may require additional
>> development efforts. I suppose  the interest would be to have these
>> external matrices incorporated into Spark with one look and feel. This may
>> require customizing the UI to fetch and display metrics or statistics from
>> the external shuffle services. Has any project done this?
>>
>> Thanks
>>
>> Mich Talebzadeh,
>> Technologist | Solutions Architect | Data Engineer  | Generative AI
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* The information provided is correct to the best of my
>> knowledge but of course cannot be guaranteed . It is essential to note
>> that, as with any advice, quote "one test result is worth one-thousand
>> expert opinions (Werner
>> Von Braun
>> )".
>>
>>
>> On Mon, 8 Apr 2024 at 14:19, Vakaris Baškirov <
>> vakaris.bashki...@gmail.com> wrote:
>>
>>> I see that both Uniffle and Celebron support S3/HDFS backends which is
>>> great.
>>> In the case someone is using S3/HDFS, I wonder what would be the
>>> advantages of using Celebron or Uniffle vs IBM shuffle service plugin
>>>  or Cloud Shuffle Storage
>>> Plugin from AWS
>>> 
>>> ?
>>>
>>> These plugins do not require deploying a separate service. Are there any
>>> advantages to using Uniffle/Celebron in the case of using S3 backend, which
>>> would require deploying a separate service?
>>>
>>> Thanks
>>> Vakaris
>>>
>>> On Mon, Apr 8, 2024 at 10:03 AM roryqi  wrote:
>>>
 Apache Uniffle (incubating) may be another solution.
 You can see
 https://github.com/apache/incubator-uniffle

 https://uniffle.apache.org/blog/2023/07/21/Uniffle%20-%20New%20chapter%20for%20the%20shuffle%20in%20the%20cloud%20native%20era

 Mich Talebzadeh  于2024年4月8日周一 07:15写道:

> Splendid
>
> The configurations below can be used with k8s deployments of Spark.
> Spark applications running on k8s can utilize these configurations to
> seamlessly access data stored in Google Cloud Storage (GCS) and Amazon S3.
>
> For Google GCS we may have
>
> spark_config_gcs = {
> "spark.kubernetes.authenticate.driver.serviceAccountName":
> "service_account_name",
> "spark.hadoop.fs.gs.impl":
> "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
> "spark.hadoop.google.cloud.auth.service.account.enable": "true",
> "spark.hadoop.google.cloud.auth.service.account.json.keyfile":
> "/path/to/keyfile.json",
> }
>
> For Amazon S3 similar
>
> spark_config_s3 = {
> "spark.kubernetes.authenticate.driver.serviceAccountName":
> "service_account_name",
> "spark.hadoop.fs.s3a.impl":
> "org.apache.hadoop.fs.s3a.S3AFileSystem",
> "spark.hadoop.fs.s3a.access.key": "s3_access_key",
> "spark.hadoop.fs.s3a.secret.key": "secret_key",
> }
>
>
> To implement these configurations and enable Spark applications to
> interact with GCS and S3, I