Re: Should python-2 be supported in Spark 3.0?

2019-05-29 Thread shane knapp
>
> I don't have a good sense of the overhead of continuing to support
> Python 2; is it large enough to consider dropping it in Spark 3.0?
>
> from the build/test side, it will actually be pretty easy to continue
support for python2.7 for spark 2.x as the feature sets won't be expanding.

that being said, i will be cracking a bottle of champagne when i can delete
all of the ansible and anaconda configs for python2.x.  :)

shane
-- 
Shane Knapp
UC Berkeley EECS Research / RISELab Staff Technical Lead
https://rise.cs.berkeley.edu


Re: Should python-2 be supported in Spark 3.0?

2019-05-29 Thread Jules Damji
Here’s the tweet from the horse’s mouth: 

https://twitter.com/gvanrossum/status/1133496146700058626?s=21

Cheers 
Jules 
—
Sent from my iPhone
Pardon the dumb thumb typos :)

> On May 29, 2019, at 10:12 PM, Sean Owen  wrote:
> 
> Deprecated -- certainly and sooner than later.
> I don't have a good sense of the overhead of continuing to support
> Python 2; is it large enough to consider dropping it in Spark 3.0?
> 
>> On Wed, May 29, 2019 at 11:47 PM Xiangrui Meng  wrote:
>> 
>> Hi all,
>> 
>> I want to revive this old thread since no action was taken so far. If we 
>> plan to mark Python 2 as deprecated in Spark 3.0, we should do it as early 
>> as possible and let users know ahead. PySpark depends on Python, numpy, 
>> pandas, and pyarrow, all of which are sunsetting Python 2 support by 
>> 2020/01/01 per https://python3statement.org/. At that time we cannot really 
>> support Python 2 because the dependent libraries do not plan to make new 
>> releases, even for security reasons. So I suggest the following:
>> 
>> 1. Update Spark website and state that Python 2 is deprecated in Spark 3.0 
>> and its support will be removed in a release after 2020/01/01.
>> 2. Make a formal announcement to dev@ and users@.
>> 3. Add Apache Spark project to https://python3statement.org/ timeline.
>> 4. Update PySpark, check python version and print a deprecation warning if 
>> version < 3.
>> 
>> Any thoughts and suggestions?
>> 
>> Best,
>> Xiangrui
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


Re: Should python-2 be supported in Spark 3.0?

2019-05-29 Thread Sean Owen
Deprecated -- certainly and sooner than later.
I don't have a good sense of the overhead of continuing to support
Python 2; is it large enough to consider dropping it in Spark 3.0?

On Wed, May 29, 2019 at 11:47 PM Xiangrui Meng  wrote:
>
> Hi all,
>
> I want to revive this old thread since no action was taken so far. If we plan 
> to mark Python 2 as deprecated in Spark 3.0, we should do it as early as 
> possible and let users know ahead. PySpark depends on Python, numpy, pandas, 
> and pyarrow, all of which are sunsetting Python 2 support by 2020/01/01 per 
> https://python3statement.org/. At that time we cannot really support Python 2 
> because the dependent libraries do not plan to make new releases, even for 
> security reasons. So I suggest the following:
>
> 1. Update Spark website and state that Python 2 is deprecated in Spark 3.0 
> and its support will be removed in a release after 2020/01/01.
> 2. Make a formal announcement to dev@ and users@.
> 3. Add Apache Spark project to https://python3statement.org/ timeline.
> 4. Update PySpark, check python version and print a deprecation warning if 
> version < 3.
>
> Any thoughts and suggestions?
>
> Best,
> Xiangrui

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Should python-2 be supported in Spark 3.0?

2019-05-29 Thread Xiangrui Meng
Hi all,

I want to revive this old thread since no action was taken so far. If we
plan to mark Python 2 as deprecated in Spark 3.0, we should do it as early
as possible and let users know ahead. PySpark depends on Python, numpy,
pandas, and pyarrow, all of which are sunsetting Python 2 support by
2020/01/01 per https://python3statement.org/. At that time we cannot really
support Python 2 because the dependent libraries do not plan to make new
releases, even for security reasons. So I suggest the following:

1. Update Spark website and state that Python 2 is deprecated in Spark 3.0
and its support will be removed in a release after 2020/01/01.
2. Make a formal announcement to dev@ and users@.
3. Add Apache Spark project to https://python3statement.org/ timeline.
4. Update PySpark, check python version and print a deprecation warning if
version < 3.

Any thoughts and suggestions?

Best,
Xiangrui

On Mon, Sep 17, 2018 at 6:54 PM Erik Erlandson  wrote:

>
> I think that makes sense. The main benefit of deprecating *prior* to 3.0
> would be informational - making the community aware of the upcoming
> transition earlier. But there are other ways to start informing the
> community between now and 3.0, besides formal deprecation.
>
> I have some residual curiosity about what it might mean for a release like
> 2.4 to still be in its support lifetime after Py2 goes EOL. I asked Apache
> Legal  to comment. It is
> possible there are no issues with this at all.
>
>
> On Mon, Sep 17, 2018 at 4:26 PM, Reynold Xin  wrote:
>
>> i'd like to second that.
>>
>> if we want to communicate timeline, we can add to the release notes
>> saying py2 will be deprecated in 3.0, and removed in a 3.x release.
>>
>> --
>> excuse the brevity and lower case due to wrist injury
>>
>>
>> On Mon, Sep 17, 2018 at 4:24 PM Matei Zaharia 
>> wrote:
>>
>>> That’s a good point — I’d say there’s just a risk of creating a
>>> perception issue. First, some users might feel that this means they have to
>>> migrate now, which is before Python itself drops support; they might also
>>> be surprised that we did this in a minor release (e.g. might we drop Python
>>> 2 altogether in a Spark 2.5 if that later comes out?). Second, contributors
>>> might feel that this means new features no longer have to work with Python
>>> 2, which would be confusing. Maybe it’s OK on both fronts, but it just
>>> seems scarier for users to do this now if we do plan to have Spark 3.0 in
>>> the next 6 months anyway.
>>>
>>> Matei
>>>
>>> > On Sep 17, 2018, at 1:04 PM, Mark Hamstra 
>>> wrote:
>>> >
>>> > What is the disadvantage to deprecating now in 2.4.0? I mean, it
>>> doesn't change the code at all; it's just a notification that we will
>>> eventually cease supporting Py2. Wouldn't users prefer to get that
>>> notification sooner rather than later?
>>> >
>>> > On Mon, Sep 17, 2018 at 12:58 PM Matei Zaharia <
>>> matei.zaha...@gmail.com> wrote:
>>> > I’d like to understand the maintenance burden of Python 2 before
>>> deprecating it. Since it is not EOL yet, it might make sense to only
>>> deprecate it once it’s EOL (which is still over a year from now).
>>> Supporting Python 2+3 seems less burdensome than supporting, say, multiple
>>> Scala versions in the same codebase, so what are we losing out?
>>> >
>>> > The other thing is that even though Python core devs might not support
>>> 2.x later, it’s quite possible that various Linux distros will if moving
>>> from 2 to 3 remains painful. In that case, we may want Apache Spark to
>>> continue releasing for it despite the Python core devs not supporting it.
>>> >
>>> > Basically, I’d suggest to deprecate this in Spark 3.0 and then remove
>>> it later in 3.x instead of deprecating it in 2.4. I’d also consider looking
>>> at what other data science tools are doing before fully removing it: for
>>> example, if Pandas and TensorFlow no longer support Python 2 past some
>>> point, that might be a good point to remove it.
>>> >
>>> > Matei
>>> >
>>> > > On Sep 17, 2018, at 11:01 AM, Mark Hamstra 
>>> wrote:
>>> > >
>>> > > If we're going to do that, then we need to do it right now, since
>>> 2.4.0 is already in release candidates.
>>> > >
>>> > > On Mon, Sep 17, 2018 at 10:57 AM Erik Erlandson 
>>> wrote:
>>> > > I like Mark’s concept for deprecating Py2 starting with 2.4: It may
>>> seem like a ways off but even now there may be some spark versions
>>> supporting Py2 past the point where Py2 is no longer receiving security
>>> patches
>>> > >
>>> > >
>>> > > On Sun, Sep 16, 2018 at 12:26 PM Mark Hamstra <
>>> m...@clearstorydata.com> wrote:
>>> > > We could also deprecate Py2 already in the 2.4.0 release.
>>> > >
>>> > > On Sat, Sep 15, 2018 at 11:46 AM Erik Erlandson 
>>> wrote:
>>> > > In case this didn't make it onto this thread:
>>> > >
>>> > > There is a 3rd option, which is to deprecate Py2 for Spark-3.0, and
>>> remove it entirely on a later 3.x release.
>>> > >
>>> > > On 

Re: Upsert for hive tables

2019-05-29 Thread Aakash Basu
Don't you have a date/timestamp to handle updates? So, you're talking about
CDC? If you've Datestamp you can check if that/those key(s) exists, if
exists then check if timestamp matches, if that matches, then ignore, if
that doesn't then update.

On Thu 30 May, 2019, 7:11 AM Genieliu,  wrote:

> Isn't step1 and step2 producing the copy of Table A?
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Upsert for hive tables

2019-05-29 Thread Genieliu
Isn't step1 and step2 producing the copy of Table A?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



adding a column to a groupBy (dataframe)

2019-05-29 Thread Marcelo Valle
Hi all,

I am new to spark and I am trying to write an application using dataframes
that normalize data.

So I have a dataframe `denormalized_cities` with 3 columns:  COUNTRY, CITY,
CITY_NICKNAME

Here is what I want to do:


   1. Map by country, then for each country generate a new ID and write to
   a new dataframe `countries`, which would have COUNTRY_ID, COUNTRY - country
   ID would be generated, probably using `monotonically_increasing_id`.
   2. For each country, write several lines on a new dataframe `cities`,
   which would have COUNTRY_ID, ID, CITY, CITY_NICKNAME. COUNTRY_ID would be
   the same generated on country table and ID would be another ID I generate.

What's the best way to do this, hopefully using only dataframes (no low
level RDDs) unless it's not possible?

I clearly see a MAP/Reduce process where for each KEY mapped I generate a
row in countries table with COUNTRY_ID and for every value I write a row in
cities table. But how to implement this in an easy and efficient way?

I thought about using a `GroupBy Country` and then using `collect` to
collect all values for that country, but then I don't know how to generate
the country id and I am not sure about memory efficiency of `collect` for a
country with too many cities (bare in mind country/city is just an example,
my real entities are different).

Could anyone point me to the direction of a good solution?

Thanks,
Marcelo.

This email is confidential [and may be protected by legal privilege]. If you 
are not the intended recipient, please do not copy or disclose its content but 
contact the sender immediately upon receipt.

KTech Services Ltd is registered in England as company number 10704940.

Registered Office: The River Building, 1 Cousin Lane, London EC4R 3TE, United 
Kingdom


[Spark Streaming]: Spark Checkpointing: Content, Recovery and Idempotency

2019-05-29 Thread Sheel Pancholi
Hello,

I am trying to understand the *content* of a checkpoint and corresponding
recovery; understanding the process of checkpointing is obviously the
natural way of going about it and so I went over the following list:

   - medium post
   
   - SO
   

   - Spark docs
   

   - the very famous Github document by Jerry Leads
   


I still am struggling to understand what goes and sits on the disk at the
end of a checkpoint.

*My understanding of Spark Checkpointing:*

If you have really long DAGs and your spark cluster fails, checkpointing
helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50
transformations can be reduced to 4-5 transformations with the help of
checkpointing. It breaks the DAG though.

*Checkpointing in Streaming*

My Spark Streaming job has a microbatch of 5 seconds. As I understand, a
new *job* is submitted by the *JobScheduler* every 5 secs that invokes the
*JobGenerator* to generate the *RDD* DAG for the new microbatch from the
*DStreamGraph*, while the *receiver* in the meantime keeps collecting the
data for the *next new* microbatch for the next cycle. If I enable
checkpointing, as I understand, it will periodically keep checkpointing the
"current state".

*Question:*

   1.

   What is this "state"? Is this the combination of *the base RDD and the
   state of the operators/transformations of the DAG for the present
   microbatch only*? So I have the following:

   ubatch 0 at T=0 > SUCCESS
   ubatch 1 at T=5 > SUCCESS
   ubatch 2 at T=10 ---> SUCCESS
   > Checkpointing kicks in now at T=12
   ubatch 3 at T=15 ---> SUCCESS
   ubatch 4 at T=20
   > Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!!
   ...
   > Spark Cluster is restarted at *T=100*

   What specifically goes and sits on the disk as a result of checkpointing
   at *T=12*? Will it just store the *present state of operators of the DAG
   for ubatch 2*?

   a. If yes, then during recovery at *T=100*, the last checkpoint
   available is at *T=12*. What happens to the *ubatch 3* at *T=15* which
   was already processed successfully. Does the application reprocess *ubatch
   3* and handle idempotency here? If yes, do we go to the streaming source
   e.g. Kafka and rewind the offset to be able to replay the contents starting
   from the *ubatch 3*?

   b. If no, then what exactly goes into the checkpoint directory at T=12?

https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency


Regards
Sheel


Re: Upsert for hive tables

2019-05-29 Thread Aakash Basu
Why don't you simply copy whole of delta data (Table A) into a stage table
(temp table in your case) and insert depending on a *WHERE NOT EXISTS* check
on primary key/composite key which already exists in the table B?

That's faster and does the reconciliation job smoothly enough.

Others, any better input?

On Wed 29 May, 2019, 10:50 PM Tomasz Krol,  wrote:

> Hey Guys,
>
> I am wondering what would be your approach to following scenario:
>
> I have two tables - one (Table A) is relatively small (e.g 50GB) and
> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>
>  I want to ADD all records from Table A to Table B which dont exist in
> Table B yet. I use only one field (e.g. key) to check existence for
> specific record.
>
> Then I want to UPDATE (by values from Table A) all records in Table B
> which also exist in Table A. To determine if specific record exist I use
> also the same "key" field.
>
> To achieve above I run following sql queries:
>
> 1. Find existing records and insert into temp table
>
> insert into temp_table select a.cols from Table A a left semi join Table B
> b on a.key = b.key
>
> 2. Find new records and insert them into temp table
>
> insert into temp_table select a.cols from Table A a left anti join Table B
> b on a.key = b.key
>
> 3. Find existing records in Table B which dont exist in   Table A
>
> insert into temp_table select b.cols from Table B b left anti join Table A
> a a.key = b. key
>
> In that way I built Table B updated with records from Table A.
> However, the problem here is the step 3, because I am inserting almost 3
> TB of data that takes obviously some time.
> I was trying different approaches but no luck.
>
> I am wondering whats your ideas how can we perform this scenario
> efficiently in Spark?
>
> Cheers
>
> Tom
> --
> Tomasz Krol
> patric...@gmail.com
>


Spark Streaming: Checkpointing Recovery and Idempotency

2019-05-29 Thread Sheel Pancholi
Hello,

I am trying to understand the *content* of a checkpoint and corresponding
recovery; understanding the process of checkpointing is obviously the
natural way of going about it and so I went over the following list:

   - medium post
   
   - SO
   

   - Spark docs
   

   - the very famous Github document by Jerry Leads
   


I still am struggling to understand what goes and sits on the disk at the
end of a checkpoint.

*My understanding of Spark Checkpointing:*

If you have really long DAGs and your spark cluster fails, checkpointing
helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50
transformations can be reduced to 4-5 transformations with the help of
checkpointing. It breaks the DAG though.

*Checkpointing in Streaming*

My Spark Streaming job has a microbatch of 5 seconds. As I understand, a
new *job* is submitted by the *JobScheduler* every 5 secs that invokes the
*JobGenerator* to generate the *RDD* DAG for the new microbatch from the
*DStreamGraph*, while the *receiver* in the meantime keeps collecting the
data for the *next new* microbatch for the next cycle. If I enable
checkpointing, as I understand, it will periodically keep checkpointing the
"current state".

*Question:*

   1.

   What is this "state"? Is this the combination of *the base RDD and the
   state of the operators/transformations of the DAG for the present
   microbatch only*? So I have the following:

   ubatch 0 at T=0 > SUCCESS
   ubatch 1 at T=5 > SUCCESS
   ubatch 2 at T=10 ---> SUCCESS
   > Checkpointing kicks in now at T=12
   ubatch 3 at T=15 ---> SUCCESS
   ubatch 4 at T=20
   > Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!!
   ...
   > Spark Cluster is restarted at *T=100*

   What specifically goes and sits on the disk as a result of checkpointing
   at *T=12*? Will it just store the *present state of operators of the DAG
   for ubatch 2*?

   a. If yes, then during recovery at *T=100*, the last checkpoint
   available is at *T=12*. What happens to the *ubatch 3* at *T=15* which
   was already processed successfully. Does the application reprocess *ubatch
   3* and handle idempotency here? If yes, do we go to the streaming source
   e.g. Kafka and rewind the offset to be able to replay the contents starting
   from the *ubatch 3*?

   b. If no, then what exactly goes into the checkpoint directory at T=12?

https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency


Regards


Spark Streaming: Checkpoint, Recovery and Idempotency

2019-05-29 Thread sheelstera
Hello,

I am trying to understand the content of a checkpoint and corresponding
recovery.

*My understanding of Spark Checkpointing:
*
If you have really long DAGs and your spark cluster fails, checkpointing
helps by persisting intermediate state e.g. to HDFS. So, a DAG of 50
transformations can be reduced to 4-5 transformations with the help of
checkpointing. It breaks the DAG though.

*Checkpointing in Streaming
*
My Spark Streaming job has a microbatch of 5 seconds. As I understand, a new
job is submitted every 5 secs on the Eventloop that invokes the JobGenerator
to generate the RDD DAG for the new microbatch from the DStreamGraph, while
the receiver in the meantime keeps collecting the data for the next new
microbatch for the next cycle. If I enable checkpointing, as I understand,
it will periodically keep checkpointing the "current state".

*Question:
*
What is this "state"? Is this the combination of the base RDD and the state
of the operators/transformations of the DAG for the present microbatch only?
So I have the following:

/ubatch 0 at T=0 > SUCCESS
ubatch 1 at T=5 > SUCCESS
ubatch 2 at T=10 ---> SUCCESS
> Checkpointing kicks in now at T=12
ubatch 3 at T=15 ---> SUCCESS
ubatch 4 at T=20
> Spark Cluster DOWN at T=23 => ubatch 4 FAILS!!!
...
> Spark Cluster is restarted at *T=100*/

What specifically goes and sits on the disk as a result of checkpointing at
T=12? Will it just store the present state of operators of the DAG for
ubatch 2?

a. If yes, then during recovery at T=100, the last checkpoint available is
at T=12. What happens to the ubatch 3 at T=15 which was already processed
successfully. Does the application reprocess ubatch 3 and handle idempotency
here? If yes, do we go to the streaming source e.g. Kafka and rewind the
offset to be able to replay the contents starting from the ubatch 3?

b. If no, then what exactly goes into the checkpoint directory at T=12?

https://stackoverflow.com/questions/56362347/spark-checkpointing-content-recovery-and-idempotency
  

Regards



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Upsert for hive tables

2019-05-29 Thread Tomasz Krol
Hey Aakash,

That will work for records which dont exist yet in the target table. What
about records which have to be updated ?

As I mentioned, I want to do an upsert. That means, I want to add not
existing records and update those which already exist.

Thanks

Tom

On Wed 29 May 2019 at 18:39, Aakash Basu  wrote:

> Why don't you simply copy whole of delta data (Table A) into a stage table
> (temp table in your case) and insert depending on a *WHERE NOT EXISTS* check
> on primary key/composite key which already exists in the table B?
>
> That's faster and does the reconciliation job smoothly enough.
>
> Others, any better input?
>
> On Wed 29 May, 2019, 10:50 PM Tomasz Krol,  wrote:
>
>> Hey Guys,
>>
>> I am wondering what would be your approach to following scenario:
>>
>> I have two tables - one (Table A) is relatively small (e.g 50GB) and
>> second one (Table B) much bigger (e.g. 3TB). Both are parquet tables.
>>
>>  I want to ADD all records from Table A to Table B which dont exist in
>> Table B yet. I use only one field (e.g. key) to check existence for
>> specific record.
>>
>> Then I want to UPDATE (by values from Table A) all records in Table B
>> which also exist in Table A. To determine if specific record exist I use
>> also the same "key" field.
>>
>> To achieve above I run following sql queries:
>>
>> 1. Find existing records and insert into temp table
>>
>> insert into temp_table select a.cols from Table A a left semi join Table
>> B b on a.key = b.key
>>
>> 2. Find new records and insert them into temp table
>>
>> insert into temp_table select a.cols from Table A a left anti join Table
>> B b on a.key = b.key
>>
>> 3. Find existing records in Table B which dont exist in   Table A
>>
>> insert into temp_table select b.cols from Table B b left anti join Table
>> A a a.key = b. key
>>
>> In that way I built Table B updated with records from Table A.
>> However, the problem here is the step 3, because I am inserting almost 3
>> TB of data that takes obviously some time.
>> I was trying different approaches but no luck.
>>
>> I am wondering whats your ideas how can we perform this scenario
>> efficiently in Spark?
>>
>> Cheers
>>
>> Tom
>> --
>> Tomasz Krol
>> patric...@gmail.com
>>
> --
Tomasz Krol
patric...@gmail.com


Upsert for hive tables

2019-05-29 Thread Tomasz Krol
Hey Guys,

I am wondering what would be your approach to following scenario:

I have two tables - one (Table A) is relatively small (e.g 50GB) and second
one (Table B) much bigger (e.g. 3TB). Both are parquet tables.

 I want to ADD all records from Table A to Table B which dont exist in
Table B yet. I use only one field (e.g. key) to check existence for
specific record.

Then I want to UPDATE (by values from Table A) all records in Table B which
also exist in Table A. To determine if specific record exist I use also the
same "key" field.

To achieve above I run following sql queries:

1. Find existing records and insert into temp table

insert into temp_table select a.cols from Table A a left semi join Table B
b on a.key = b.key

2. Find new records and insert them into temp table

insert into temp_table select a.cols from Table A a left anti join Table B
b on a.key = b.key

3. Find existing records in Table B which dont exist in   Table A

insert into temp_table select b.cols from Table B b left anti join Table A
a a.key = b. key

In that way I built Table B updated with records from Table A.
However, the problem here is the step 3, because I am inserting almost 3 TB
of data that takes obviously some time.
I was trying different approaches but no luck.

I am wondering whats your ideas how can we perform this scenario
efficiently in Spark?

Cheers

Tom
-- 
Tomasz Krol
patric...@gmail.com


Re: Does Spark SQL has match_recognize?

2019-05-29 Thread kant kodali
Nope Not at all

On Sun, May 26, 2019 at 8:15 AM yeikel valdes  wrote:

> Isn't match_recognize just a filter?
>
> df.filter(predicate)?
>
>
>  On Sat, 25 May 2019 12:55:47 -0700 * kanth...@gmail.com
>  * wrote 
>
> Hi All,
>
> Does Spark SQL has match_recognize? I am not sure why CEP seems to be
> neglected I believe it is one of the most useful concepts in the Financial
> applications!
>
> Is there a plan to support it?
>
> Thanks!
>
>
>


Re: Executors idle, driver heap exploding and maxing only 1 cpu core

2019-05-29 Thread Akshay Bhardwaj
Hi,

A few thoughts to add to Nicholas' apt reply.

We were loading multiple files from AWS S3 in our Spark application. When
the spark step of load files is called, the driver spends significant time
fetching the exact path of files from AWS s3.
Especially because we specified S3 paths like regex string (Eg:
s3a://bucket-name/folder1/data1/2019-05*/* , Defines that I want to
reference all sub-files/folders for the month of May 2019)

At that time how I was able to verify the same was by running "iftop" linux
command, and this showed a lot of network calls to *s3.amazon.com* servers

This phenomena occurs only when I define the load files transformation,
even when no save/collect action has been called in my spark pipeline.
Even on Spark UI it does not show that any stage is in running mode. And
only when all the network calls to AWS s3 are completed, the Spark UI shows
that the call to load files was completed in 2 seconds.
My spark job "seemed to be paused" for over half an hour depending upon the
number of files. I believe this happens due to the underlying library of
AWS SDK/Azure SDK that we use in Spark.
They need to fetch exact file paths in the object stores before they can be
referenced in spark.


As you mention you are using Azure blob files, this should explain the
behaviour where everything seems to stop. You can reduce this time by
ensuring you have small number of large files in your blob store to read
from instead of vice-a-versa.

Akshay Bhardwaj
+91-97111-33849


On Thu, May 23, 2019 at 11:13 PM Nicholas Hakobian <
nicholas.hakob...@rallyhealth.com> wrote:

> One potential case that can cause this is the optimizer being a little
> overzealous with determining if a table can be broadcasted or not. Have you
> checked the UI or query plan to see if any steps include a
> BroadcastHashJoin? Its possible that the optimizer thinks that it should be
> able to fit the table in memory from looking at its size on disk, but it
> actually cannot fit in memory. In this case you might want to look at
> tuning the autoBroadcastJoinThreshold.
>
> Another potential case is that at the step it looks like the driver is
> "hanging" its attempting to load in a data source that is backed by a very
> large number of files. Spark maintains a cache of file paths for a data
> source to determine task splits, and we've seen the driver appear to hang
> and/or crash if you try to load in thousands (or more) of tiny files per
> partition, and you have a large number of partitions.
>
> Hope this helps.
>
> Nicholas Szandor Hakobian, Ph.D.
> Principal Data Scientist
> Rally Health
> nicholas.hakob...@rallyhealth.com
>
>
> On Thu, May 23, 2019 at 7:36 AM Ashic Mahtab  wrote:
>
>> Hi,
>> We have a quite long winded Spark application we inherited with many
>> stages. When we run on our spark cluster, things start off well enough.
>> Workers are busy, lots of progress made, etc. etc. However, 30 minutes into
>> processing, we see CPU usage of the workers drop drastically. At this time,
>> we also see that the driver is maxing out exactly one core (though we've
>> given it more than one), and its ram usage is creeping up. At this time,
>> there's no logs coming out on the driver. Everything seems to stop, and
>> then it suddenly starts working, and the workers start working again. The
>> driver ram doesn't go down, but flatlines. A few minutes later, the same
>> thing happens again - the world seems to stop. However, the driver soon
>> crashes with an out of memory exception.
>>
>> What could be causing this sort of behaviour on the driver? We don't have
>> any collect() or similar functions in the code. We're reading in from Azure
>> blobs, processing, and writing back to Azure blobs. Where should we start
>> in trying to get to the bottom of this? We're running Spark 2.4.1 in a
>> stand-alone cluster.
>>
>> Thanks,
>> Ashic.
>>
>