Read file from local

2021-11-04 Thread Lynx Du
Hi experts,

I am just get started using spark and scala.

I am confused how to read local files.

I run a spark cluster using docker-compose. There are one master and 2 worker 
nodes. I think this cluster is so-called standalone cluster.

I am trying to submit a simple task to this cluster by this command

spark-submit --class example.SimpleApp --master spark://localhost:7077 
simple-project_2.12-1.0.jar file:///tmp/sharefiles/README.md


This is my test result.

Case 1:

I mount local(my Mac desktop) /tmp/sharefiles to each worker. It works fine.  
That’s /tmp/sharefiles/README.md should exists both on my local desktop and 
worker machine.

Othere cases all failed to read.

Isn’t it?

Why my local desktop need have this file?  How can I remove this limitation. 
For My understanding, file:///xxx should only need exist on worker node.

Thanks



Pyspark 2.4.4 window functions inconsistent

2021-11-04 Thread van wilson
I am using pyspark sql to run a sql script windows function to pull in
(lead) data from the next row to populate the first row. It works reliably
on Jupyter in VS code using anaconda pyspark 3.0.0. It produces different
data results every time on aws emr using spark 2.4.4. Why? Is there any
known bugs with subqueries or windows functions on pyspark 2.4?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Ok so it boils down on how spark does create toPandas() DF under the
bonnet. How many executors are involved in k8s cluster. In this model spark
will create executors = no of nodes - 1

On Thu, 4 Nov 2021 at 17:42, Sergey Ivanychev 
wrote:

> > Just to confirm with Collect() alone, this is all on the driver?
>
> I shared the screenshot with the plan in the first email. In the collect()
> case the data gets fetched to the driver without problems.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh 
> написал(а):
>
> Just to confirm with Collect() alone, this is all on the driver?
>
> --



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> did you get to read the excerpts from the book of Dr. Zaharia?

I read what you have shared but didn’t manage to get your point.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:38, Gourav Sengupta  
> написал(а):
> 
> did you get to read the excerpts from the book of Dr. Zaharia?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
> Just to confirm with Collect() alone, this is all on the driver?

I shared the screenshot with the plan in the first email. In the collect() case 
the data gets fetched to the driver without problems.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 20:37, Mich Talebzadeh  
> написал(а):
> 
> Just to confirm with Collect() alone, this is all on the driver?


Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Gourav Sengupta
Hi,

did you get to read the excerpts from the book of Dr. Zaharia?

Regards,
Gourav

On Thu, Nov 4, 2021 at 4:11 PM Sergey Ivanychev 
wrote:

> I’m sure that its running in client mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
>
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh 
> написал(а):
>
> 
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev 
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>>>
 I think you're talking about koalas, 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Well evidently the indication is that this is happening on the executor and
not on the driver node as assumed. Just to confirm with Collect() alone,
this is all on the driver?

HTH

On Thu, 4 Nov 2021 at 16:10, Sergey Ivanychev 
wrote:

> I’m sure that its running in clientele mode. I don’t want to have the same
> amount of RAM on drivers and executors since there’s no point in giving 64G
> of ram to executors in my case.
> My question is why collect and toPandas actions produce so different
> plans, which cause toPandas to fail on executors.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh 
> написал(а):
>
> 
>
>
> From your notes ".. IIUC, in the `toPandas` case all the data gets
> shuffled to a single executor that fails with OOM, which doesn’t happen in
> `collect` case. This does it work like that? How do I collect a large
> dataset that fits into memory of the driver?.
>
> The acid test would be to use pandas and ensure that all nodes have the
> same amount of RAM. The assumption here is that the master node has a
> larger amount of RAM that in theory should handle the work. for Jupiter
> with Pandas. You can easily find out which mode Spark is deploying by
> looking at Spark GUI page.
>
> HTH
>
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev 
> wrote:
>
>> I will follow up with the output, but I suppose Jupyter runs in client
>> mode since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same
>> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
>> difference in execution plans.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Do you have the output for executors from spark GUI, the one that
>> eventually ends up with OOM?
>>
>> Also what does
>>
>> kubectl get pods -n $NAMESPACE
>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
>> $1}'`
>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>
>>
>> say?
>>
>>
>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
>> spark-submit under the bonnet. The job starts on the driver where the
>> Jupyter notebook is on but the actual job runs on the cluster itself in
>> cluster mode. If your assertion is right (executors don't play much of a
>> role), just run the whole thing in local mode!
>>
>>
>> HTH
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
>> wrote:
>>
>>> I want to further clarify the use case I have: an ML engineer collects
>>> data so as to use it for training an ML model. The driver is created within
>>> Jupiter notebook and has 64G of ram for fetching the training set and
>>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>>> big as the driver.
>>>
>>> Currently, the best solution I found is to write the dataframe to S3,
>>> and then read it via pd.read_parquet.
>>>
>>> Best regards,
>>>
>>>
>>> Sergey Ivanychev
>>>
>>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
>>> написал(а):
>>>
>>> 
>>> Thanks for clarification on the koalas case.
>>>
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>>> the data gets shuffled to a single executor that fails with OOM,
>>>
>>> I still believe that this may be related to the way k8s handles
>>> shuffling. In a balanced k8s cluster this could be avoided which does not
>>> seem to be the case here as the so called driver node has 8 times more
>>> RAM than the other nodes.
>>>
>>> HTH
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I’m sure that its running in client mode. I don’t want to have the same amount 
of RAM on drivers and executors since there’s no point in giving 64G of ram to 
executors in my case.

My question is why collect and toPandas actions produce so different plans, 
which cause toPandas to fail on executors.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 15:17, Mich Talebzadeh  
> написал(а):
> 
> 
> 
> From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled 
> to a single executor that fails with OOM, which doesn’t happen in `collect` 
> case. This does it work like that? How do I collect a large dataset that fits 
> into memory of the driver?.
> 
> The acid test would be to use pandas and ensure that all nodes have the same 
> amount of RAM. The assumption here is that the master node has a larger 
> amount of RAM that in theory should handle the work. for Jupiter with Pandas. 
> You can easily find out which mode Spark is deploying by looking at Spark GUI 
> page.
> 
> HTH
> 
> 
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev  
>> wrote:
>> I will follow up with the output, but I suppose Jupyter runs in client mode 
>> since it’s created via getOrCreate with a K8s api server as master.
>> Also note that I tried both “collect” and “toPandas” in the same conditions 
>> (Jupyter client mode), so IIUC your theory doesn’t explain that difference 
>> in execution plans.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
 написал(а):
 
>>> 
>>> Do you have the output for executors from spark GUI, the one that 
>>> eventually ends up with OOM?
>>> 
>>> Also what does 
>>> 
>>> kubectl get pods -n $NAMESPACE 
>>> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print 
>>> $1}'`
>>> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
>>> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>>> 
>>> say?
>>> 
>>> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
>>> spark-submit under the bonnet. The job starts on the driver where the 
>>> Jupyter notebook is on but the actual job runs on the cluster itself in 
>>> cluster mode. If your assertion is right (executors don't play much of a 
>>> role), just run the whole thing in local mode!
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>> 
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
 On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
 wrote:
 I want to further clarify the use case I have: an ML engineer collects 
 data so as to use it for training an ML model. The driver is created 
 within Jupiter notebook and has 64G of ram for fetching the training set 
 and feeding it to the model. Naturally, in this case executors shouldn’t 
 be as big as the driver.
 
 Currently, the best solution I found is to write the dataframe to S3, and 
 then read it via pd.read_parquet.
 
 Best regards,
 
 
 Sergey Ivanychev
 
>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
>> написал(а):
>> 
> 
> Thanks for clarification on the koalas case.
> 
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
> the data gets shuffled to a single executor that fails with OOM, 
> 
> I still believe that this may be related to the way k8s handles 
> shuffling. In a balanced k8s cluster this could be avoided which does not 
> seem to be the case here as the so called driver node has 8 times more 
> RAM than the other nodes. 
> 
> HTH
> 
>view my Linkedin profile
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any 
> loss, damage or destruction of data or any other property which may arise 
> from relying on this email's technical content is explicitly disclaimed. 
> The author will in no case be liable for any monetary damages arising 
> from such loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>> I think you're talking about koalas, which is in Spark 3.2, but that is 
>> unrelated to toPandas(), nor to the question of how it differs from 
>> collect().
>> Shuffle is also unrelated.
>> 
>>> On 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
>From your notes ".. IIUC, in the `toPandas` case all the data gets shuffled
to a single executor that fails with OOM, which doesn’t happen in `collect`
case. This does it work like that? How do I collect a large dataset that
fits into memory of the driver?.

The acid test would be to use pandas and ensure that all nodes have the
same amount of RAM. The assumption here is that the master node has a
larger amount of RAM that in theory should handle the work. for Jupiter
with Pandas. You can easily find out which mode Spark is deploying by
looking at Spark GUI page.

HTH



   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Thu, 4 Nov 2021 at 11:25, Sergey Ivanychev 
wrote:

> I will follow up with the output, but I suppose Jupyter runs in client
> mode since it’s created via getOrCreate with a K8s api server as master.
> Also note that I tried both “collect” and “toPandas” in the same
> conditions (Jupyter client mode), so IIUC your theory doesn’t explain that
> difference in execution plans.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh 
> написал(а):
>
> 
> Do you have the output for executors from spark GUI, the one that
> eventually ends up with OOM?
>
> Also what does
>
> kubectl get pods -n $NAMESPACE
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
> $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
>
>
> say?
>
>
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage
> spark-submit under the bonnet. The job starts on the driver where the
> Jupyter notebook is on but the actual job runs on the cluster itself in
> cluster mode. If your assertion is right (executors don't play much of a
> role), just run the whole thing in local mode!
>
>
> HTH
>
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
> wrote:
>
>> I want to further clarify the use case I have: an ML engineer collects
>> data so as to use it for training an ML model. The driver is created within
>> Jupiter notebook and has 64G of ram for fetching the training set and
>> feeding it to the model. Naturally, in this case executors shouldn’t be as
>> big as the driver.
>>
>> Currently, the best solution I found is to write the dataframe to S3, and
>> then read it via pd.read_parquet.
>>
>> Best regards,
>>
>>
>> Sergey Ivanychev
>>
>> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
>> написал(а):
>>
>> 
>> Thanks for clarification on the koalas case.
>>
>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
>> the data gets shuffled to a single executor that fails with OOM,
>>
>> I still believe that this may be related to the way k8s handles
>> shuffling. In a balanced k8s cluster this could be avoided which does not
>> seem to be the case here as the so called driver node has 8 times more
>> RAM than the other nodes.
>>
>> HTH
>>
>>view my Linkedin profile
>> 
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>>
>>> I think you're talking about koalas, which is in Spark 3.2, but that is
>>> unrelated to toPandas(), nor to the question of how it differs from
>>> collect().
>>> Shuffle is also unrelated.
>>>
>>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Hi,

 As I understood in the previous versions of Spark the data could not
 be processed and stored in Pandas data frames in a distributed mode as
 these data frames store data in RAM which is the driver in this case.
 However, I was under the impression that this limitation no longer
 exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
 and 8GB of RAM for others, and PySpark running in cluster mode,  how do 

Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-04 Thread Kapoor, Rohit
My basic test is here - https://github.com/rohitkapoor1/sparkPushDownAggregate


From: German Schiavon 
Date: Thursday, 4 November 2021 at 2:17 AM
To: huaxin gao 
Cc: Kapoor, Rohit , user@spark.apache.org 

Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.

Hi,

Rohit, can you share how it looks using DSv2?

Thanks!

On Wed, 3 Nov 2021 at 19:35, huaxin gao 
mailto:huaxin.ga...@gmail.com>> wrote:
Great to hear. Thanks for testing this!

On Wed, Nov 3, 2021 at 4:03 AM Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>> wrote:
Thanks for your guidance Huaxin. I have been able to test the push down 
operators successfully against Postgresql using DS v2.


From: huaxin gao mailto:huaxin.ga...@gmail.com>>
Date: Tuesday, 2 November 2021 at 12:35 AM
To: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
No need to write a customized data source reader. You may want to follow the 
example here 
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala#L40
 to use DS v2. The example uses h2 database. Please modify it to use postgresql.

Huaxin


On Mon, Nov 1, 2021 at 11:21 AM Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>> wrote:
Hi Huaxin,

Thanks a lot for your response. Do I need to write a custom data source reader 
(in my case, for PostgreSql) using the Spark DS v2 APIs, instead of the 
standard spark.read.format(“jdbc”) ?


Thanks,
Rohit

From: huaxin gao mailto:huaxin.ga...@gmail.com>>
Date: Monday, 1 November 2021 at 11:32 PM
To: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Cc: user@spark.apache.org 
mailto:user@spark.apache.org>>
Subject: Re: [Spark SQL]: Aggregate Push Down / Spark 3.2
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
EXTERNAL MAIL: USE CAUTION BEFORE CLICKING LINKS OR OPENING ATTACHMENTS. ALWAYS 
VERIFY THE SOURCE OF MESSAGES.
Hi Rohit,

Thanks for testing this. Seems to me that you are using DS v1. We only support 
aggregate push down in DS v2. Could you please try again using DS v2 and let me 
know how it goes?

Thanks,
Huaxin

On Mon, Nov 1, 2021 at 10:39 AM Chao Sun 
mailto:sunc...@apache.org>> wrote:

-- Forwarded message -
From: Kapoor, Rohit 
mailto:rohit.kap...@envestnet.com>>
Date: Mon, Nov 1, 2021 at 6:27 AM
Subject: [Spark SQL]: Aggregate Push Down / Spark 3.2
To: user@spark.apache.org 
mailto:user@spark.apache.org>>

Hi,

I am testing the aggregate push down for JDBC after going through the JIRA - 
https://issues.apache.org/jira/browse/SPARK-34952
I have the latest Spark 3.2 setup in local mode (laptop).

I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate query 
on “emp” table that has 102 rows and a simple schema with 3 columns (empid, 
ename and sal) as below:

val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"

val jdbcDF = spark.read
.format("jdbc")
.option("url", jdbcString)
.option("dbtable", "emp")
.option("pushDownAggregate","true")
.option("user", "")
.option("password", "")
.load()
.where("empid > 1")
.agg(max("SAL")).alias("max_sal")


The complete plan details are:

== Parsed Logical Plan ==
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Analyzed Logical Plan ==
max(SAL): int
SubqueryAlias max_sal
+- Aggregate [max(SAL#2) AS max(SAL)#10]
   +- Filter (empid#0 > 1)
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Optimized Logical Plan ==
Aggregate [max(SAL#2) AS max(SAL)#10]
+- Project [sal#2]
   +- Filter (isnotnull(empid#0) AND (empid#0 > 1))
  +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp) [numPartitions=1]

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
  +- HashAggregate(keys=[], functions=[partial_max(SAL#2)], output=[max#13])
 +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] PushedAggregates: 
[], PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)], PushedGroupby: 
[], ReadSchema: struct


I also checked the sql 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Sergey Ivanychev
I will follow up with the output, but I suppose Jupyter runs in client mode 
since it’s created via getOrCreate with a K8s api server as master.
Also note that I tried both “collect” and “toPandas” in the same conditions 
(Jupyter client mode), so IIUC your theory doesn’t explain that difference in 
execution plans.

Best regards,


Sergey Ivanychev

> 4 нояб. 2021 г., в 13:12, Mich Talebzadeh  
> написал(а):
> 
> 
> Do you have the output for executors from spark GUI, the one that eventually 
> ends up with OOM?
> 
> Also what does 
> 
> kubectl get pods -n $NAMESPACE 
> DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print $1}'`
> kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
> kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE
> 
> say?
> 
> My guess is that Jupyter notebook like Zeppelin notebook does a two stage 
> spark-submit under the bonnet. The job starts on the driver where the Jupyter 
> notebook is on but the actual job runs on the cluster itself in cluster mode. 
> If your assertion is right (executors don't play much of a role), just run 
> the whole thing in local mode!
> 
> HTH
> 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
> damage or destruction of data or any other property which may arise from 
> relying on this email's technical content is explicitly disclaimed. The 
> author will in no case be liable for any monetary damages arising from such 
> loss, damage or destruction.
>  
> 
> 
>> On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev  
>> wrote:
>> I want to further clarify the use case I have: an ML engineer collects data 
>> so as to use it for training an ML model. The driver is created within 
>> Jupiter notebook and has 64G of ram for fetching the training set and 
>> feeding it to the model. Naturally, in this case executors shouldn’t be as 
>> big as the driver.
>> 
>> Currently, the best solution I found is to write the dataframe to S3, and 
>> then read it via pd.read_parquet.
>> 
>> Best regards,
>> 
>> 
>> Sergey Ivanychev
>> 
 4 нояб. 2021 г., в 00:18, Mich Talebzadeh  
 написал(а):
 
>>> 
>>> Thanks for clarification on the koalas case.
>>> 
>>> The thread owner states and I quote: .. IIUC, in the `toPandas` case all 
>>> the data gets shuffled to a single executor that fails with OOM, 
>>> 
>>> I still believe that this may be related to the way k8s handles shuffling. 
>>> In a balanced k8s cluster this could be avoided which does not seem to be 
>>> the case here as the so called driver node has 8 times more RAM than the 
>>> other nodes. 
>>> 
>>> HTH
>>> 
>>>view my Linkedin profile
>>>  
>>> Disclaimer: Use it at your own risk. Any and all responsibility for any 
>>> loss, damage or destruction of data or any other property which may arise 
>>> from relying on this email's technical content is explicitly disclaimed. 
>>> The author will in no case be liable for any monetary damages arising from 
>>> such loss, damage or destruction.
>>>  
>>> 
>>> 
 On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
 I think you're talking about koalas, which is in Spark 3.2, but that is 
 unrelated to toPandas(), nor to the question of how it differs from 
 collect().
 Shuffle is also unrelated.
 
> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>  wrote:
> Hi,
> 
> As I understood in the previous versions of Spark the data could not be 
> processed and stored in Pandas data frames in a distributed mode as these 
> data frames store data in RAM which is the driver in this case. 
> However, I was under the impression that this limitation no longer exists 
> in 3.2? So if you have a k8s cluster with 64GB of RAM for one node and 
> 8GB of RAM for others, and PySpark running in cluster mode,  how do you 
> expect the process to confine itself to the master node? What will happen 
> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s 
> cluster) and run the job again?
> 
> Worth noting that the current Spark on k8s  does not support external 
> shuffle. For now we have two parameters for Dynamic Resource Allocation. 
> These are 
> 
>  --conf spark.dynamicAllocation.enabled=true \
>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>  
> The idea is to use dynamic resource allocation where the driver tracks 
> the shuffle files and evicts only executors not storing active shuffle 
> files. So in a nutshell these shuffle files are stored in the executors 
> themselves in the absence of the external shuffle. The model works on the 
> basis of the "one-container-per-Pod" model  meaning that for each node of 
> the cluster there will be one node running the driver and each remaining 
> node running one executor each. 
> 
> 
> HTH
> , 
>view my Linkedin profile
> 
>  
> Disclaimer: Use it 

Re: PySpark: toPandas() vs collect() execution graph differences

2021-11-04 Thread Mich Talebzadeh
Do you have the output for executors from spark GUI, the one that
eventually ends up with OOM?

Also what does

kubectl get pods -n $NAMESPACE
DRIVER_POD_NAME=`kubectl get pods -n $NAMESPACE |grep driver|awk '{print
$1}'`
kubectl logs $DRIVER_POD_NAME -n $NAMESPACE
kubectl logs $EXECUTOR_WITH_OOM  -n $NAMESPACE


say?


My guess is that Jupyter notebook like Zeppelin notebook does a two stage
spark-submit under the bonnet. The job starts on the driver where the
Jupyter notebook is on but the actual job runs on the cluster itself in
cluster mode. If your assertion is right (executors don't play much of a
role), just run the whole thing in local mode!


HTH


   view my Linkedin profile




*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 3 Nov 2021 at 22:08, Sergey Ivanychev 
wrote:

> I want to further clarify the use case I have: an ML engineer collects
> data so as to use it for training an ML model. The driver is created within
> Jupiter notebook and has 64G of ram for fetching the training set and
> feeding it to the model. Naturally, in this case executors shouldn’t be as
> big as the driver.
>
> Currently, the best solution I found is to write the dataframe to S3, and
> then read it via pd.read_parquet.
>
> Best regards,
>
>
> Sergey Ivanychev
>
> 4 нояб. 2021 г., в 00:18, Mich Talebzadeh 
> написал(а):
>
> 
> Thanks for clarification on the koalas case.
>
> The thread owner states and I quote: .. IIUC, in the `toPandas` case all
> the data gets shuffled to a single executor that fails with OOM,
>
> I still believe that this may be related to the way k8s handles shuffling.
> In a balanced k8s cluster this could be avoided which does not seem to be
> the case here as the so called driver node has 8 times more RAM than the
> other nodes.
>
> HTH
>
>view my Linkedin profile
> 
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Wed, 3 Nov 2021 at 21:00, Sean Owen  wrote:
>
>> I think you're talking about koalas, which is in Spark 3.2, but that is
>> unrelated to toPandas(), nor to the question of how it differs from
>> collect().
>> Shuffle is also unrelated.
>>
>> On Wed, Nov 3, 2021 at 3:45 PM Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> As I understood in the previous versions of Spark the data could not be
>>> processed and stored in Pandas data frames in a distributed mode as these
>>> data frames store data in RAM which is the driver in this case.
>>> However, I was under the impression that this limitation no longer
>>> exists in 3.2? So if you have a k8s cluster with 64GB of RAM for one node
>>> and 8GB of RAM for others, and PySpark running in cluster mode,  how do you
>>> expect the process to confine itself to the master node? What will happen
>>> if you increase executor node(s) RAM to 64GB temporarily (balanced k8s
>>> cluster) and run the job again?
>>>
>>> Worth noting that the current Spark on k8s  does not support external
>>> shuffle. For now we have two parameters for Dynamic Resource Allocation.
>>> These are
>>>
>>>  --conf spark.dynamicAllocation.enabled=true \
>>>  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
>>>
>>>
>>> The idea is to use dynamic resource allocation where the driver tracks
>>> the shuffle files and evicts only executors not storing active shuffle
>>> files. So in a nutshell these shuffle files are stored in the executors
>>> themselves in the absence of the external shuffle. The model works on the
>>> basis of the "one-container-per-Pod" model
>>>  meaning that for
>>> each node of the cluster there will be one node running the driver and each
>>> remaining node running one executor each.
>>>
>>>
>>>
>>> HTH
>>> ,
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>


Re: [Spark SQL]: Aggregate Push Down / Spark 3.2

2021-11-04 Thread Sunil Prabhakara
Unsubscribe.

On Mon, Nov 1, 2021 at 6:57 PM Kapoor, Rohit 
wrote:

> Hi,
>
>
>
> I am testing the aggregate push down for JDBC after going through the JIRA
> - https://issues.apache.org/jira/browse/SPARK-34952
>
> I have the latest Spark 3.2 setup in local mode (laptop).
>
>
>
> I have PostgreSQL v14 locally on my laptop. I am trying a basic aggregate
> query on “emp” table that has 102 rows and a simple schema with 3
> columns (empid, ename and sal) as below:
>
>
>
> val jdbcString = "jdbc:postgresql://" + "localhost" + ":5432/postgres"
>
>
>
> val jdbcDF = spark.read
>
> .format("jdbc")
>
> .option("url", jdbcString)
>
> .option("dbtable", "emp")
>
> .option("pushDownAggregate","true")
>
> .option("user", "")
>
> .option("password", "")
>
> .load()
>
> .where("empid > 1")
>
> .agg(max("SAL")).alias("max_sal")
>
>
>
>
>
> The complete plan details are:
>
>
>
> == Parsed Logical Plan ==
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>+- Filter (empid#0 > 1)
>
>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Analyzed Logical Plan ==
>
> max(SAL): int
>
> SubqueryAlias max_sal
>
> +- Aggregate [max(SAL#2) AS max(SAL)#10]
>
>+- Filter (empid#0 > 1)
>
>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Optimized Logical Plan ==
>
> Aggregate [max(SAL#2) AS max(SAL)#10]
>
> +- Project [sal#2]
>
>+- Filter (isnotnull(empid#0) AND (empid#0 > 1))
>
>   +- Relation [empid#0,ename#1,sal#2] JDBCRelation(emp)
> [numPartitions=1]
>
>
>
> == Physical Plan ==
>
> AdaptiveSparkPlan isFinalPlan=false
>
> +- HashAggregate(keys=[], functions=[max(SAL#2)], output=[max(SAL)#10])
>
>+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#15]
>
>   +- HashAggregate(keys=[], functions=[partial_max(SAL#2)],
> output=[max#13])
>
>  +- Scan JDBCRelation(emp) [numPartitions=1] [sal#2] 
> *PushedAggregates:
> []*, PushedFilters: [*IsNotNull(empid), *GreaterThan(empid,1)],
> PushedGroupby: [], ReadSchema: struct
>
>
>
>
>
> I also checked the sql submitted to the database, querying
> pg_stat_statements, and it confirms that the aggregate was not pushed
> down to the database. Here is the query submitted to the database:
>
>
>
> SELECT "sal" FROM emp WHERE ("empid" IS NOT NULL) AND ("empid" > $1)
>
>
>
> All the rows are read and aggregated in the Spark layer.
>
>
>
> Is there any configuration I missing here? Why is aggregate push down not
> working for me?
>
> Any pointers would be greatly appreciated.
>
>
>
>
>
> Thanks,
>
> Rohit
> --
>
> Disclaimer: The information in this email is confidential and may be
> legally privileged. Access to this Internet email by anyone else other than
> the recipient is unauthorized. Envestnet, Inc. and its affiliated companies
> do not accept time-sensitive transactional messages, including orders to
> buy and sell securities, account allocation instructions, or any other
> instructions affecting a client account, via e-mail. If you are not the
> intended recipient of this email, any disclosure, copying, or distribution
> of it is prohibited and may be unlawful. If you have received this email in
> error, please notify the sender and immediately and permanently delete it
> and destroy any copies of it that were printed out. When addressed to our
> clients, any opinions or advice contained in this email is subject to the
> terms and conditions expressed in any applicable governing terms of
> business or agreements.
> --
>