Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Mich Talebzadeh
OK, good news. You have made some progress here :)

bzip (bzip2) works (splittable) because it is block-oriented whereas gzip
is stream oriented. I also noticed that you are creating a managed ORC
file.  You can bucket and partition an ORC (Optimized Row Columnar file
format. An example below:


DROP TABLE IF EXISTS dummy;

CREATE TABLE dummy (
 ID INT
   , CLUSTERED INT
   , SCATTERED INT
   , RANDOMISED INT
   , RANDOM_STRING VARCHAR(50)
   , SMALL_VC VARCHAR(10)
   , PADDING  VARCHAR(10)
)
CLUSTERED BY (ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES (
"orc.create.index"="true",
"orc.bloom.filter.columns"="ID",
"orc.bloom.filter.fpp"="0.05",
"orc.compress"="SNAPPY",
"orc.stripe.size"="16777216",
"orc.row.index.stride"="1" )
;

HTH

Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 19:35, Patrick Tucci  wrote:

> Hi Mich,
>
> Thanks for the reply. I started running ANALYZE TABLE on the external
> table, but the progress was very slow. The stage had only read about 275MB
> in 10 minutes. That equates to about 5.5 hours just to analyze the table.
>
> This might just be the reality of trying to process a 240m record file
> with 80+ columns, unless there's an obvious issue with my setup that
> someone sees. The solution is likely going to involve increasing
> parallelization.
>
> To that end, I extracted and re-zipped this file in bzip. Since bzip is
> splittable and gzip is not, Spark can process the bzip file in parallel.
> The same CTAS query only took about 45 minutes. This is still a bit slower
> than I had hoped, but the import from bzip fully utilized all available
> cores. So we can give the cluster more resources if we need the process to
> go faster.
>
> Patrick
>
> On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> OK for now have you analyzed statistics in Hive external table
>>
>> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
>> COLUMNS;
>> spark-sql (default)> DESC EXTENDED test.stg_t2;
>>
>> Hive external tables have little optimization
>>
>> HTH
>>
>>
>>
>> Mich Talebzadeh,
>> Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>> London
>> United Kingdom
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
>> wrote:
>>
>>> Hello,
>>>
>>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>>> and 64GB of RAM.
>>>
>>> I'm trying to process a large pipe delimited file that has been
>>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>>> columns). I uploaded the gzipped file to HDFS and created an external table
>>> using the attached script. I tried two simpler queries on the same table,
>>> and they finished in ~5 and ~10 minutes respectively:
>>>
>>> SELECT COUNT(*) FROM ClaimsImport;
>>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>>
>>> However, when I tried to create a table stored as ORC using this table
>>> as the input, the query ran for almost 4 hours:
>>>
>>> CREATE TABLE Claims STORED AS ORC
>>> AS
>>> SELECT *
>>> FROM ClaimsImport
>>> --Exclude the header record
>>> WHERE ClaimID <> 'ClaimID';
>>>
>>> [image: image.png]
>>>
>>> Why is there such a speed disparity between these different operations?
>>> I understand that this job cannot be parallelized because the file is
>>> compressed with gzip. I also understand that creating an ORC table from the
>>> input will take more time than a simple COUNT(*). But it doesn't feel like
>>> the CREATE TABLE operation should take more than 24x longer than a simple
>>> SELECT COUNT(*) statement.
>>>
>>> Thanks for any help. Please let me know if I can provide any additional
>>> information.
>>>
>>> Patrick
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hi Mich,

Thanks for the reply. I started running ANALYZE TABLE on the external
table, but the progress was very slow. The stage had only read about 275MB
in 10 minutes. That equates to about 5.5 hours just to analyze the table.

This might just be the reality of trying to process a 240m record file with
80+ columns, unless there's an obvious issue with my setup that someone
sees. The solution is likely going to involve increasing parallelization.

To that end, I extracted and re-zipped this file in bzip. Since bzip is
splittable and gzip is not, Spark can process the bzip file in parallel.
The same CTAS query only took about 45 minutes. This is still a bit slower
than I had hoped, but the import from bzip fully utilized all available
cores. So we can give the cluster more resources if we need the process to
go faster.

Patrick

On Mon, Jun 26, 2023 at 12:52 PM Mich Talebzadeh 
wrote:

> OK for now have you analyzed statistics in Hive external table
>
> spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
> COLUMNS;
> spark-sql (default)> DESC EXTENDED test.stg_t2;
>
> Hive external tables have little optimization
>
> HTH
>
>
>
> Mich Talebzadeh,
> Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Jun 2023 at 16:33, Patrick Tucci 
> wrote:
>
>> Hello,
>>
>> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
>> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
>> and 64GB of RAM.
>>
>> I'm trying to process a large pipe delimited file that has been
>> compressed with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85
>> columns). I uploaded the gzipped file to HDFS and created an external table
>> using the attached script. I tried two simpler queries on the same table,
>> and they finished in ~5 and ~10 minutes respectively:
>>
>> SELECT COUNT(*) FROM ClaimsImport;
>> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>>
>> However, when I tried to create a table stored as ORC using this table as
>> the input, the query ran for almost 4 hours:
>>
>> CREATE TABLE Claims STORED AS ORC
>> AS
>> SELECT *
>> FROM ClaimsImport
>> --Exclude the header record
>> WHERE ClaimID <> 'ClaimID';
>>
>> [image: image.png]
>>
>> Why is there such a speed disparity between these different operations? I
>> understand that this job cannot be parallelized because the file is
>> compressed with gzip. I also understand that creating an ORC table from the
>> input will take more time than a simple COUNT(*). But it doesn't feel like
>> the CREATE TABLE operation should take more than 24x longer than a simple
>> SELECT COUNT(*) statement.
>>
>> Thanks for any help. Please let me know if I can provide any additional
>> information.
>>
>> Patrick
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Mich Talebzadeh
OK for now have you analyzed statistics in Hive external table

spark-sql (default)> ANALYZE TABLE test.stg_t2 COMPUTE STATISTICS FOR ALL
COLUMNS;
spark-sql (default)> DESC EXTENDED test.stg_t2;

Hive external tables have little optimization

HTH



Mich Talebzadeh,
Solutions Architect/Engineering Lead
Palantir Technologies Limited
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Jun 2023 at 16:33, Patrick Tucci  wrote:

> Hello,
>
> I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master
> node has 2 cores and 8GB of RAM. There is a single worker node with 8 cores
> and 64GB of RAM.
>
> I'm trying to process a large pipe delimited file that has been compressed
> with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
> uploaded the gzipped file to HDFS and created an external table using the
> attached script. I tried two simpler queries on the same table, and they
> finished in ~5 and ~10 minutes respectively:
>
> SELECT COUNT(*) FROM ClaimsImport;
> SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;
>
> However, when I tried to create a table stored as ORC using this table as
> the input, the query ran for almost 4 hours:
>
> CREATE TABLE Claims STORED AS ORC
> AS
> SELECT *
> FROM ClaimsImport
> --Exclude the header record
> WHERE ClaimID <> 'ClaimID';
>
> [image: image.png]
>
> Why is there such a speed disparity between these different operations? I
> understand that this job cannot be parallelized because the file is
> compressed with gzip. I also understand that creating an ORC table from the
> input will take more time than a simple COUNT(*). But it doesn't feel like
> the CREATE TABLE operation should take more than 24x longer than a simple
> SELECT COUNT(*) statement.
>
> Thanks for any help. Please let me know if I can provide any additional
> information.
>
> Patrick
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Spark-Sql - Slow Performance With CTAS and Large Gzipped File

2023-06-26 Thread Patrick Tucci
Hello,

I'm using Spark 3.4.0 in standalone mode with Hadoop 3.3.5. The master node
has 2 cores and 8GB of RAM. There is a single worker node with 8 cores and
64GB of RAM.

I'm trying to process a large pipe delimited file that has been compressed
with gzip (9.2GB zipped, ~58GB unzipped, ~241m records, ~85 columns). I
uploaded the gzipped file to HDFS and created an external table using the
attached script. I tried two simpler queries on the same table, and they
finished in ~5 and ~10 minutes respectively:

SELECT COUNT(*) FROM ClaimsImport;
SELECT COUNT(*) FROM ClaimsImport WHERE ClaimLineID = 1;

However, when I tried to create a table stored as ORC using this table as
the input, the query ran for almost 4 hours:

CREATE TABLE Claims STORED AS ORC
AS
SELECT *
FROM ClaimsImport
--Exclude the header record
WHERE ClaimID <> 'ClaimID';

[image: image.png]

Why is there such a speed disparity between these different operations? I
understand that this job cannot be parallelized because the file is
compressed with gzip. I also understand that creating an ORC table from the
input will take more time than a simple COUNT(*). But it doesn't feel like
the CREATE TABLE operation should take more than 24x longer than a simple
SELECT COUNT(*) statement.

Thanks for any help. Please let me know if I can provide any additional
information.

Patrick


Create Table.sql
Description: Binary data

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

spark perform slow in eclipse rich client platform

2021-07-09 Thread jianxu
Hi There;

 �

Wonder if anyone might have experience with running spark app from Eclipse Rich 
Client Platform in java. The same code run from Eclipse Rich Client Platform of 
spark app is much slower than running from normal Java in Eclipse without Rich 
Client Platform.

 �

Appreciate any input on the matter.

 �

Best Regards,

 �

Jian Xu



Re: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?

2021-03-10 Thread 钟雨
Hi Pankaj,

Can you show your detail code and Job/Stage Info? Which Stage is slow?


Pankaj Bhootra  于2021年3月10日周三 下午12:32写道:

> Hi,
>
> Could someone please revert on this?
>
>
> Thanks
> Pankaj Bhootra
>
>
> On Sun, 7 Mar 2021, 01:22 Pankaj Bhootra,  wrote:
>
>> Hello Team
>>
>> I am new to Spark and this question may be a possible duplicate of the
>> issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
>>
>> We have a large dataset partitioned by calendar date, and within each
>> date partition, we are storing the data as *parquet* files in 128 parts.
>>
>> We are trying to run aggregation on this dataset for 366 dates at a time
>> with Spark SQL on spark version 2.3.0, hence our Spark job is reading
>> 366*128=46848 partitions, all of which are parquet files. There is
>> currently no *_metadata* or *_common_metadata* file(s) available for
>> this dataset.
>>
>> The problem we are facing is that when we try to run *spark.read.parquet* on
>> the above 46848 partitions, our data reads are extremely slow. It takes a
>> long time to run even a simple map task (no shuffling) without any
>> aggregation or group by.
>>
>> I read through the above issue and I think I perhaps generally understand
>> the ideas around *_common_metadata* file. But the above issue was raised
>> for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
>> related to this metadata file so far.
>>
>> I would like to clarify:
>>
>>1. What's the latest, best practice for reading large number of
>>parquet files efficiently?
>>2. Does this involve using any additional options with
>>spark.read.parquet? How would that work?
>>3. Are there other possible reasons for slow data reads apart from
>>reading metadata for every part? We are basically trying to migrate our
>>existing spark pipeline from using csv files to parquet, but from my
>>hands-on so far, it seems that parquet's read time is slower than csv? 
>> This
>>seems contradictory to popular opinion that parquet performs better in
>>terms of both computation and storage?
>>
>>
>> Thanks
>> Pankaj Bhootra
>>
>>
>>
>> -- Forwarded message -
>> From: Takeshi Yamamuro (Jira) 
>> Date: Sat, 6 Mar 2021, 20:02
>> Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark
>> Extremely Slow for Large Number of Files?
>> To: 
>>
>>
>>
>> [
>> https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296528#comment-17296528
>> ]
>>
>> Takeshi Yamamuro commented on SPARK-34648:
>> --
>>
>> Please use the mailing list (user@spark.apache.org) instead. This is not
>> a right place to ask questions.
>>
>> > Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
>> > 
>> >
>> > Key: SPARK-34648
>> > URL: https://issues.apache.org/jira/browse/SPARK-34648
>> > Project: Spark
>> >  Issue Type: Question
>> >  Components: SQL
>> >Affects Versions: 2.3.0
>> >Reporter: Pankaj Bhootra
>> >Priority: Major
>> >
>> > Hello Team
>> > I am new to Spark and this question may be a possible duplicate of the
>> issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
>> > We have a large dataset partitioned by calendar date, and within each
>> date partition, we are storing the data as *parquet* files in 128 parts.
>> > We are trying to run aggregation on this dataset for 366 dates at a
>> time with Spark SQL on spark version 2.3.0, hence our Spark job is reading
>> 366*128=46848 partitions, all of which are parquet files. There is
>> currently no *_metadata* or *_common_metadata* file(s) available for this
>> dataset.
>> > The problem we are facing is that when we try to run
>> *spark.read.parquet* on the above 46848 partitions, our data reads are
>> extremely slow. It takes a long time to run even a simple map task (no
>> shuffling) without any aggregation or group by.
>> > I read through the above issue and I think I perhaps generally
>> understand the ideas around *_common_metadata* file. But the above issue
>> was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any
>> d

Re: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?

2021-03-09 Thread Pankaj Bhootra
Hi,

Could someone please revert on this?


Thanks
Pankaj Bhootra


On Sun, 7 Mar 2021, 01:22 Pankaj Bhootra,  wrote:

> Hello Team
>
> I am new to Spark and this question may be a possible duplicate of the
> issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
>
> We have a large dataset partitioned by calendar date, and within each date
> partition, we are storing the data as *parquet* files in 128 parts.
>
> We are trying to run aggregation on this dataset for 366 dates at a time
> with Spark SQL on spark version 2.3.0, hence our Spark job is reading
> 366*128=46848 partitions, all of which are parquet files. There is
> currently no *_metadata* or *_common_metadata* file(s) available for this
> dataset.
>
> The problem we are facing is that when we try to run *spark.read.parquet* on
> the above 46848 partitions, our data reads are extremely slow. It takes a
> long time to run even a simple map task (no shuffling) without any
> aggregation or group by.
>
> I read through the above issue and I think I perhaps generally understand
> the ideas around *_common_metadata* file. But the above issue was raised
> for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
> related to this metadata file so far.
>
> I would like to clarify:
>
>1. What's the latest, best practice for reading large number of
>parquet files efficiently?
>2. Does this involve using any additional options with
>spark.read.parquet? How would that work?
>3. Are there other possible reasons for slow data reads apart from
>reading metadata for every part? We are basically trying to migrate our
>existing spark pipeline from using csv files to parquet, but from my
>hands-on so far, it seems that parquet's read time is slower than csv? This
>seems contradictory to popular opinion that parquet performs better in
>terms of both computation and storage?
>
>
> Thanks
> Pankaj Bhootra
>
>
>
> -- Forwarded message -
> From: Takeshi Yamamuro (Jira) 
> Date: Sat, 6 Mar 2021, 20:02
> Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark
> Extremely Slow for Large Number of Files?
> To: 
>
>
>
> [
> https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296528#comment-17296528
> ]
>
> Takeshi Yamamuro commented on SPARK-34648:
> ----------
>
> Please use the mailing list (user@spark.apache.org) instead. This is not
> a right place to ask questions.
>
> > Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
> > 
> >
> > Key: SPARK-34648
> > URL: https://issues.apache.org/jira/browse/SPARK-34648
> > Project: Spark
> >  Issue Type: Question
> >  Components: SQL
> >Affects Versions: 2.3.0
> >Reporter: Pankaj Bhootra
> >Priority: Major
> >
> > Hello Team
> > I am new to Spark and this question may be a possible duplicate of the
> issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
> > We have a large dataset partitioned by calendar date, and within each
> date partition, we are storing the data as *parquet* files in 128 parts.
> > We are trying to run aggregation on this dataset for 366 dates at a time
> with Spark SQL on spark version 2.3.0, hence our Spark job is reading
> 366*128=46848 partitions, all of which are parquet files. There is
> currently no *_metadata* or *_common_metadata* file(s) available for this
> dataset.
> > The problem we are facing is that when we try to run
> *spark.read.parquet* on the above 46848 partitions, our data reads are
> extremely slow. It takes a long time to run even a simple map task (no
> shuffling) without any aggregation or group by.
> > I read through the above issue and I think I perhaps generally
> understand the ideas around *_common_metadata* file. But the above issue
> was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any
> documentation related to this metadata file so far.
> > I would like to clarify:
> >  # What's the latest, best practice for reading large number of parquet
> files efficiently?
> >  # Does this involve using any additional options with
> spark.read.parquet? How would that work?
> >  # Are there other possible reasons for slow data reads apart from
> reading metadata for every part? We are basically trying to migrate our
> existing spark pipeline from using csv files to parquet, but from my
> hands-on so far, it seems that parquet's read time is slower than csv? This
> seems contradictory to popular opinion that parquet performs better in
> terms of both computation and storage?
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.3.4#803005)
>


Fwd: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?

2021-03-06 Thread Pankaj Bhootra
Hello Team

I am new to Spark and this question may be a possible duplicate of the
issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347

We have a large dataset partitioned by calendar date, and within each date
partition, we are storing the data as *parquet* files in 128 parts.

We are trying to run aggregation on this dataset for 366 dates at a time
with Spark SQL on spark version 2.3.0, hence our Spark job is reading
366*128=46848 partitions, all of which are parquet files. There is
currently no *_metadata* or *_common_metadata* file(s) available for this
dataset.

The problem we are facing is that when we try to run *spark.read.parquet* on
the above 46848 partitions, our data reads are extremely slow. It takes a
long time to run even a simple map task (no shuffling) without any
aggregation or group by.

I read through the above issue and I think I perhaps generally understand
the ideas around *_common_metadata* file. But the above issue was raised
for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
related to this metadata file so far.

I would like to clarify:

   1. What's the latest, best practice for reading large number of parquet
   files efficiently?
   2. Does this involve using any additional options with
   spark.read.parquet? How would that work?
   3. Are there other possible reasons for slow data reads apart from
   reading metadata for every part? We are basically trying to migrate our
   existing spark pipeline from using csv files to parquet, but from my
   hands-on so far, it seems that parquet's read time is slower than csv? This
   seems contradictory to popular opinion that parquet performs better in
   terms of both computation and storage?


Thanks
Pankaj Bhootra



-- Forwarded message -
From: Takeshi Yamamuro (Jira) 
Date: Sat, 6 Mar 2021, 20:02
Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark
Extremely Slow for Large Number of Files?
To: 



[
https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296528#comment-17296528
]

Takeshi Yamamuro commented on SPARK-34648:
--

Please use the mailing list (user@spark.apache.org) instead. This is not a
right place to ask questions.

> Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
> 
>
> Key: SPARK-34648
> URL: https://issues.apache.org/jira/browse/SPARK-34648
> Project: Spark
>  Issue Type: Question
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Pankaj Bhootra
>Priority: Major
>
> Hello Team
> I am new to Spark and this question may be a possible duplicate of the
issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347
> We have a large dataset partitioned by calendar date, and within each
date partition, we are storing the data as *parquet* files in 128 parts.
> We are trying to run aggregation on this dataset for 366 dates at a time
with Spark SQL on spark version 2.3.0, hence our Spark job is reading
366*128=46848 partitions, all of which are parquet files. There is
currently no *_metadata* or *_common_metadata* file(s) available for this
dataset.
> The problem we are facing is that when we try to run
*spark.read.parquet* on the above 46848 partitions, our data reads are
extremely slow. It takes a long time to run even a simple map task (no
shuffling) without any aggregation or group by.
> I read through the above issue and I think I perhaps generally understand
the ideas around *_common_metadata* file. But the above issue was raised
for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation
related to this metadata file so far.
> I would like to clarify:
>  # What's the latest, best practice for reading large number of parquet
files efficiently?
>  # Does this involve using any additional options with
spark.read.parquet? How would that work?
>  # Are there other possible reasons for slow data reads apart from
reading metadata for every part? We are basically trying to migrate our
existing spark pipeline from using csv files to parquet, but from my
hands-on so far, it seems that parquet's read time is slower than csv? This
seems contradictory to popular opinion that parquet performs better in
terms of both computation and storage?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


RE: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-26 Thread van den Heever, Christian CC
Hi,

How do I get the filename from

textFileStream


Using streaming.

Thanks a mill
Standard Bank email disclaimer and confidentiality note
Please go to www.standardbank.co.za/site/homepage/emaildisclaimer.html to read 
our email disclaimer and confidentiality note. Kindly email 
disclai...@standardbank.co.za (no content or subject line necessary) if you 
cannot view that page and we will email our email disclaimer and 
confidentiality note to you.


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Juho Autio
> Not sure if the dynamic overwrite logic is implemented in Spark or in Hive

AFAIK I'm using spark implementation(s). Does the thread dump that I posted
show that? I'd like to remain within Spark impl.

What I'm trying to ask is, do you spark developers see some ways to
optimize this?

Otherwise, I'm not sure what you mean by this:

> There is a probably a limit in the number of element you can pass in the
list of partitions for the listPartitionsWithAuthInfo API call

That request takes a "max" argument, which is just a limit. The type is
short, so max size per response is 32767. Any way, even with this single
request & response it already takes that 5 minutes.

On Thu, Apr 25, 2019 at 5:46 PM vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> There is a probably a limit in the number of element you can pass in the
> list of partitions for the listPartitionsWithAuthInfo API call. Not sure if
> the dynamic overwrite logic is implemented in Spark or in Hive, in which
> case using hive 1.2.1 is probably the reason for un-optimized logic but
> also a huge constraint for solving this issue as upgrading Hive version is
> a real challenge
>
> Le jeu. 25 avr. 2019 à 15:10, Juho Autio  a écrit :
>
>> Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
>> which is always quite fast. Spark's insertInto uses
>> get_partitions_with_auth which is much slower (it also gets location
>> etc. of each partition).
>>
>> I created a test in java that with a local metastore client to measure
>> the time:
>>
>> I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
>> partitions in both responses). I didn't get next page of results, but this
>> gives the idea already:
>>
>> listPartitionNames completed in: 1540 ms ~= 1,5 seconds
>> listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes
>>
>> I wonder if this can be optimized on metastore side, but at least it
>> doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
>> backed by AWS RDS).
>>
>> So my original question remains; does spark need to know about all
>> existing partitions for dynamic overwrite? I don't see why it would.
>>
>> On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
>> vincent.gromakow...@gmail.com> wrote:
>>
>>> Which metastore are you using?
>>>
>>> Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a
>>> écrit :
>>>
 Would anyone be able to answer this question about the non-optimal
 implementation of insertInto?

 On Thu, Apr 18, 2019 at 4:45 PM Juho Autio 
 wrote:

> Hi,
>
> My job is writing ~10 partitions with insertInto. With the same input
> / output data the total duration of the job is very different depending on
> how many partitions the target table has.
>
> Target table with 10 of partitions:
> 1 min 30 s
>
> Target table with ~1 partitions:
> 13 min 0 s
>
> It seems that spark is always fetching the full list of partitions in
> target table. When this happens, the cluster is basically idling while
> driver is listing partitions.
>
> Here's a thread dump for executor driver from such idle time:
> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>
> Is there any way to optimize this currently? Is this a known issue?
> Any plans to improve?
>
> My code is essentially:
>
> spark = SparkSession.builder \
> .config('spark.sql.hive.caseSensitiveInferenceMode',
> 'NEVER_INFER') \
> .config("hive.exec.dynamic.partition", "true") \
> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
> .enableHiveSupport() \
> .getOrCreate()
>
> out_df.write \
> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
> .insertInto(target_table_name, overwrite=True)
>
> Table has been originally created from spark with saveAsTable.
>
> Does spark need to know anything about the existing partitions though?
> As a manual workaround I would write the files directly to the partition
> locations, delete existing files first if there's anything in that
> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
> PARTITION. This doesn't require previous knowledge on existing partitions.
>
> Thanks.
>

>>
>> --
>> *Juho Autio*
>> Senior Data Engineer
>>
>> Data Engineering, Games
>> Rovio Entertainment Corporation
>> Mobile: + 358 (0)45 313 0122
>> juho.au...@rovio.com
>> www.rovio.com
>>
>> *This message and its attachments may contain confidential information
>> and is intended solely for the attention and use of the named addressee(s).
>> If you are not the intended recipient and / or you have received this
>> message in error, please contact the sender immediately and delete all
>> material you have received in this message. You are hereby 

Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread vincent gromakowski
There is a probably a limit in the number of element you can pass in the
list of partitions for the listPartitionsWithAuthInfo API call. Not sure if
the dynamic overwrite logic is implemented in Spark or in Hive, in which
case using hive 1.2.1 is probably the reason for un-optimized logic but
also a huge constraint for solving this issue as upgrading Hive version is
a real challenge

Le jeu. 25 avr. 2019 à 15:10, Juho Autio  a écrit :

> Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
> which is always quite fast. Spark's insertInto uses
> get_partitions_with_auth which is much slower (it also gets location etc.
> of each partition).
>
> I created a test in java that with a local metastore client to measure the
> time:
>
> I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
> partitions in both responses). I didn't get next page of results, but this
> gives the idea already:
>
> listPartitionNames completed in: 1540 ms ~= 1,5 seconds
> listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes
>
> I wonder if this can be optimized on metastore side, but at least it
> doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
> backed by AWS RDS).
>
> So my original question remains; does spark need to know about all
> existing partitions for dynamic overwrite? I don't see why it would.
>
> On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
> vincent.gromakow...@gmail.com> wrote:
>
>> Which metastore are you using?
>>
>> Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a écrit :
>>
>>> Would anyone be able to answer this question about the non-optimal
>>> implementation of insertInto?
>>>
>>> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:
>>>
 Hi,

 My job is writing ~10 partitions with insertInto. With the same input /
 output data the total duration of the job is very different depending on
 how many partitions the target table has.

 Target table with 10 of partitions:
 1 min 30 s

 Target table with ~1 partitions:
 13 min 0 s

 It seems that spark is always fetching the full list of partitions in
 target table. When this happens, the cluster is basically idling while
 driver is listing partitions.

 Here's a thread dump for executor driver from such idle time:
 https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

 Is there any way to optimize this currently? Is this a known issue? Any
 plans to improve?

 My code is essentially:

 spark = SparkSession.builder \
 .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER')
 \
 .config("hive.exec.dynamic.partition", "true") \
 .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
 .config("hive.exec.dynamic.partition.mode", "nonstrict") \
 .enableHiveSupport() \
 .getOrCreate()

 out_df.write \
 .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
 .insertInto(target_table_name, overwrite=True)

 Table has been originally created from spark with saveAsTable.

 Does spark need to know anything about the existing partitions though?
 As a manual workaround I would write the files directly to the partition
 locations, delete existing files first if there's anything in that
 partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
 PARTITION. This doesn't require previous knowledge on existing partitions.

 Thanks.

>>>
>
> --
> *Juho Autio*
> Senior Data Engineer
>
> Data Engineering, Games
> Rovio Entertainment Corporation
> Mobile: + 358 (0)45 313 0122
> juho.au...@rovio.com
> www.rovio.com
>
> *This message and its attachments may contain confidential information and
> is intended solely for the attention and use of the named addressee(s). If
> you are not the intended recipient and / or you have received this message
> in error, please contact the sender immediately and delete all material you
> have received in this message. You are hereby notified that any use of the
> information, which you have received in error in whatsoever form, is
> strictly prohibited. Thank you for your co-operation.*
>


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Juho Autio
Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
which is always quite fast. Spark's insertInto uses
get_partitions_with_auth which
is much slower (it also gets location etc. of each partition).

I created a test in java that with a local metastore client to measure the
time:

I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
partitions in both responses). I didn't get next page of results, but this
gives the idea already:

listPartitionNames completed in: 1540 ms ~= 1,5 seconds
listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes

I wonder if this can be optimized on metastore side, but at least it
doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
backed by AWS RDS).

So my original question remains; does spark need to know about all existing
partitions for dynamic overwrite? I don't see why it would.

On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> Which metastore are you using?
>
> Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a écrit :
>
>> Would anyone be able to answer this question about the non-optimal
>> implementation of insertInto?
>>
>> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:
>>
>>> Hi,
>>>
>>> My job is writing ~10 partitions with insertInto. With the same input /
>>> output data the total duration of the job is very different depending on
>>> how many partitions the target table has.
>>>
>>> Target table with 10 of partitions:
>>> 1 min 30 s
>>>
>>> Target table with ~1 partitions:
>>> 13 min 0 s
>>>
>>> It seems that spark is always fetching the full list of partitions in
>>> target table. When this happens, the cluster is basically idling while
>>> driver is listing partitions.
>>>
>>> Here's a thread dump for executor driver from such idle time:
>>> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>>>
>>> Is there any way to optimize this currently? Is this a known issue? Any
>>> plans to improve?
>>>
>>> My code is essentially:
>>>
>>> spark = SparkSession.builder \
>>> .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
>>> .config("hive.exec.dynamic.partition", "true") \
>>> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
>>> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>>> .enableHiveSupport() \
>>> .getOrCreate()
>>>
>>> out_df.write \
>>> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
>>> .insertInto(target_table_name, overwrite=True)
>>>
>>> Table has been originally created from spark with saveAsTable.
>>>
>>> Does spark need to know anything about the existing partitions though?
>>> As a manual workaround I would write the files directly to the partition
>>> locations, delete existing files first if there's anything in that
>>> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
>>> PARTITION. This doesn't require previous knowledge on existing partitions.
>>>
>>> Thanks.
>>>
>>

-- 
*Juho Autio*
Senior Data Engineer

Data Engineering, Games
Rovio Entertainment Corporation
Mobile: + 358 (0)45 313 0122
juho.au...@rovio.com
www.rovio.com

*This message and its attachments may contain confidential information and
is intended solely for the attention and use of the named addressee(s). If
you are not the intended recipient and / or you have received this message
in error, please contact the sender immediately and delete all material you
have received in this message. You are hereby notified that any use of the
information, which you have received in error in whatsoever form, is
strictly prohibited. Thank you for your co-operation.*


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Khare, Ankit
Why do you need 1 partition when 10 partition is doing the job .. ??

 Thanks
Ankit

From: vincent gromakowski 
Date: Thursday, 25. April 2019 at 09:12
To: Juho Autio 
Cc: user 
Subject: Re: [Spark SQL]: Slow insertInto overwrite if target table has many 
partitions

Which metastore are you using?

Le jeu. 25 avr. 2019 à 09:02, Juho Autio 
mailto:juho.au...@rovio.com>> a écrit :
Would anyone be able to answer this question about the non-optimal 
implementation of insertInto?

On Thu, Apr 18, 2019 at 4:45 PM Juho Autio 
mailto:juho.au...@rovio.com>> wrote:
Hi,

My job is writing ~10 partitions with insertInto. With the same input / output 
data the total duration of the job is very different depending on how many 
partitions the target table has.

Target table with 10 of partitions:
1 min 30 s

Target table with ~1 partitions:
13 min 0 s

It seems that spark is always fetching the full list of partitions in target 
table. When this happens, the cluster is basically idling while driver is 
listing partitions.

Here's a thread dump for executor driver from such idle time:
https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

Is there any way to optimize this currently? Is this a known issue? Any plans 
to improve?

My code is essentially:

spark = SparkSession.builder \
.config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
.config("hive.exec.dynamic.partition", "true") \
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()

out_df.write \
.option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
.insertInto(target_table_name, overwrite=True)

Table has been originally created from spark with saveAsTable.

Does spark need to know anything about the existing partitions though? As a 
manual workaround I would write the files directly to the partition locations, 
delete existing files first if there's anything in that partition, and then 
call metastore to ALTER TABLE IF NOT EXISTS ADD PARTITION. This doesn't require 
previous knowledge on existing partitions.

Thanks.


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread vincent gromakowski
Which metastore are you using?

Le jeu. 25 avr. 2019 à 09:02, Juho Autio  a écrit :

> Would anyone be able to answer this question about the non-optimal
> implementation of insertInto?
>
> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:
>
>> Hi,
>>
>> My job is writing ~10 partitions with insertInto. With the same input /
>> output data the total duration of the job is very different depending on
>> how many partitions the target table has.
>>
>> Target table with 10 of partitions:
>> 1 min 30 s
>>
>> Target table with ~1 partitions:
>> 13 min 0 s
>>
>> It seems that spark is always fetching the full list of partitions in
>> target table. When this happens, the cluster is basically idling while
>> driver is listing partitions.
>>
>> Here's a thread dump for executor driver from such idle time:
>> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>>
>> Is there any way to optimize this currently? Is this a known issue? Any
>> plans to improve?
>>
>> My code is essentially:
>>
>> spark = SparkSession.builder \
>> .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
>> .config("hive.exec.dynamic.partition", "true") \
>> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
>> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>> .enableHiveSupport() \
>> .getOrCreate()
>>
>> out_df.write \
>> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
>> .insertInto(target_table_name, overwrite=True)
>>
>> Table has been originally created from spark with saveAsTable.
>>
>> Does spark need to know anything about the existing partitions though? As
>> a manual workaround I would write the files directly to the partition
>> locations, delete existing files first if there's anything in that
>> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
>> PARTITION. This doesn't require previous knowledge on existing partitions.
>>
>> Thanks.
>>
>


Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-25 Thread Juho Autio
Would anyone be able to answer this question about the non-optimal
implementation of insertInto?

On Thu, Apr 18, 2019 at 4:45 PM Juho Autio  wrote:

> Hi,
>
> My job is writing ~10 partitions with insertInto. With the same input /
> output data the total duration of the job is very different depending on
> how many partitions the target table has.
>
> Target table with 10 of partitions:
> 1 min 30 s
>
> Target table with ~1 partitions:
> 13 min 0 s
>
> It seems that spark is always fetching the full list of partitions in
> target table. When this happens, the cluster is basically idling while
> driver is listing partitions.
>
> Here's a thread dump for executor driver from such idle time:
> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>
> Is there any way to optimize this currently? Is this a known issue? Any
> plans to improve?
>
> My code is essentially:
>
> spark = SparkSession.builder \
> .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
> .config("hive.exec.dynamic.partition", "true") \
> .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
> .config("hive.exec.dynamic.partition.mode", "nonstrict") \
> .enableHiveSupport() \
> .getOrCreate()
>
> out_df.write \
> .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
> .insertInto(target_table_name, overwrite=True)
>
> Table has been originally created from spark with saveAsTable.
>
> Does spark need to know anything about the existing partitions though? As
> a manual workaround I would write the files directly to the partition
> locations, delete existing files first if there's anything in that
> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
> PARTITION. This doesn't require previous knowledge on existing partitions.
>
> Thanks.
>


[Spark SQL]: Slow insertInto overwrite if target table has many partitions

2019-04-18 Thread Juho Autio
Hi,

My job is writing ~10 partitions with insertInto. With the same input /
output data the total duration of the job is very different depending on
how many partitions the target table has.

Target table with 10 of partitions:
1 min 30 s

Target table with ~1 partitions:
13 min 0 s

It seems that spark is always fetching the full list of partitions in
target table. When this happens, the cluster is basically idling while
driver is listing partitions.

Here's a thread dump for executor driver from such idle time:
https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20

Is there any way to optimize this currently? Is this a known issue? Any
plans to improve?

My code is essentially:

spark = SparkSession.builder \
.config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
.config("hive.exec.dynamic.partition", "true") \
.config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
.config("hive.exec.dynamic.partition.mode", "nonstrict") \
.enableHiveSupport() \
.getOrCreate()

out_df.write \
.option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
.insertInto(target_table_name, overwrite=True)

Table has been originally created from spark with saveAsTable.

Does spark need to know anything about the existing partitions though? As a
manual workaround I would write the files directly to the partition
locations, delete existing files first if there's anything in that
partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
PARTITION. This doesn't require previous knowledge on existing partitions.

Thanks.


spark streaming slow checkpointing when calling Rserve

2016-09-19 Thread Piubelli, Manuel


Hello,



I wrote a spark streaming application in Java. It reads stock trades off of a 
data feed receiver and converts them to Tick objects, and uses a microbatch 
interval, window interval and sliding interval of 10 seconds. A 
JavaPairDStream<String, Iterable> is created where the key is the stock 
symbol.

The Tick objects are then stored in a JavaMapWithStateDStream using 
mapWithState; analytics calculations are performed in the mapWithState callback 
function using the Ticks as input. Everything works fine until I modified my 
program to also call Rserve inside the mapWithState callback function in order 
to perform additional analytics calculations in R.

When I started calling Rserve, every 10th window would take a long time to 
process; this is the window that also writes to the checkpoint file (I am using 
Hadoop). Every 10th window takes longer to process than the previous 10th 
window (window 30 takes longer than window 20 which takes longer than window 
10). All of the non-checkpoint windows finish well within 10 seconds, but the 
checkpoint windows can eventually take minutes to complete, and the other 
windows queue behind them.

I then tried to set the checkpoint interval on the JavaMapWithStateDStream to 
24 hours in order to effectively disable checkpointing 
(mapWithStateStream.checkpoint(Durations.minutes(1440))). I enabled the workers 
on the 3 server cluster with enough memory so that they would survive the 
growing memory usage that would result.

The results that I outputted to the log were unexpected. Previously the 
JavaPairDStream<String, Iterable> was being populated with 5000 keys, and 
it still was. But, previously 5000 keys were being passed to the mapWithState 
callback function; now only 200 keys were being passed to it, and I see many 
stages skipped in the Spark Streaming UI web page. When I run this in single 
process mode on my MS Windows machine, 5000 keys are still passed to the 
mapWithState callback function.

Does anyone have any idea of why calling Rserve would cause such a huge 
increase in checkpointing time, or why calling 
checkpoint(Durations.minutes(1440)) on the JavaMapWithStateDStream would cause 
spark to not pass most of the tuples in the JavaPairDStream<String, 
Iterable> to the mapWithState callback function?



Question is also posted on 
http://stackoverflow.com/questions/39535804/spark-streaming-slow-checkpointing-when-calling-rserve.



Thanks



mongo-hadoop with Spark is slow for me, and adding nodes doesn't seem to make any noticeable difference

2015-09-21 Thread cscarioni
Hi,I appreciate any help or pointers in the right direction

My current test scenario is the following.

I want to process a MongoDB collection, anonymising some fields on it and
store it in another Collection.

The size of the collection is around 900 GB with 2.5 million documents

Following is the code.



object Anonymizer extends SparkRunner {

  val sqlContext = new SQLContext(sc)

  MongoDBLoader(conf, sc,
"output").load(MongoHadoopImplementationReader(conf, sc, "input").rdd,
(dbObject: BSONObject) => {
  dbObject.put("add_field", "John Macclane")
  val embedded = dbObject.get("embedded").asInstanceOf[BasicDBObject]
  embedded.put("business_name", Name.first_name)
  dbObject.put("embedded", webRfq)
  val notesWrapper =
Option(dbObject.get("embedded_list").asInstanceOf[java.util.ArrayList[BasicDBObject]])
  notesWrapper match {
case Some(notes) =>
  notes.foreach((note: BasicDBObject) => {
note.put("text", Name.name)
  })
case None =>
  }
  dbObject
}
  )
}...

And




case class MongoHadoopImplementationReader(conf: com.typesafe.config.Config,
sc: SparkContext, collection: String) {
  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.input.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_database")}.${collection}")
  mongoConfig.set("mongo.input.split_size", "50")
  mongoConfig.set("mongo.input.limit", "70")


  def rdd: RDD[(Object, BSONObject)] = {
val rdd = sc.newAPIHadoopRDD(
  mongoConfig,
  classOf[MongoInputFormat],
  classOf[Object],
  classOf[BSONObject])
rdd
  }

}


And 


case class MongoDBLoader(conf: com.typesafe.config.Config, sc:SparkContext,
collection: String) {

  val mongoConfig = new Configuration()

  mongoConfig.set("mongo.output.uri",
   
s"mongodb://${conf.getString("replicant.mongo_host")}:27017/${conf.getString("replicant.mongo_output_database")}.${collection}")

  def load(rdd: => RDD[(Object, BSONObject)], transformer: (BSONObject) =>
BSONObject) = {

val mongoRDD = rdd.map[(Object, BSONObject)]((tuple: (Object,
BSONObject)) => {
  (null, transformer(tuple._2))
})

mongoRDD.saveAsNewAPIHadoopFile(
  "file:///this-is-completely-unused",
  classOf[Object],
  classOf[BSONObject],
  classOf[MongoOutputFormat[Object, BSONObject]],
  mongoConfig)
  }
}


This code runs slow. Taking 9.5 hours in a 3 machine cluster to process all.
And after 6 hours in a 30 machine cluster I stopped as it was only about
half processed.

The machines are ec2 m3.large instances. The MongoDB lives on another EC2
instance inside the same VPC and same subnet.

I tried to look into the configuration options but it seems that in most
cases the defaults are the way to go (number of cores, memory, etc). 

It looks like I have some bottleneck somewhere, but not sure at all. And I
am thinking Mongo is not able to handle the parallelism? 

How are the RDDs stored in memory?. When I run it, I see I get around 32000
partitions and tasks created. Then it looks to slow down the processing
towards it advance (This can be due to mongo documents being bigger at the
second half of our DB.).

I see as well that the split is stored in HDFS in Spark and then read and
BulkInserted in Mongo. However there is a lot of HDFS space (like 30 gigs
per machine) but just a tiny fraction is used. Wouldn't it be better to fill
this more and only try to insert into mongo when more data is available?. 

I also tried to increase the Split size, but it complains of not enough
resources on the worker. However I don't think the Splits are big enough to
actually fill the 6GB of memory of each node, as when it stores them on HDFS
is a lot less than that.

Is there anything obvious (or not :)) that I am not doing correctly?. Is
this the correct way to transform a collection from Mongo to Mongo?. Is
there another way?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/mongo-hadoop-with-Spark-is-slow-for-me-and-adding-nodes-doesn-t-seem-to-make-any-noticeable-differene-tp24754.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread allanjie
 locations for shuffle 0 to sparkExecutor@HadoopV26Slave2:33439
15/05/24 16:27:35 INFO spark.MapOutputTrackerMaster: Size of output statuses
for shuffle 0 is 188 bytes
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave3:52152
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave6:52445 (size: 57.0 KB, free: 2.1 GB)
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave5:45835 (size: 57.0 KB, free: 2.1 GB)
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave6:37211
15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to sparkExecutor@HadoopV26Slave5:43757
15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on
HadoopV26Slave3:38931 (size: 1733.1 KB, free: 1656.2 MB)
15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_2 in memory on
HadoopV26Slave2:44597 (size: 1733.1 KB, free: 1656.2 MB)
15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_3 in memory on
HadoopV26Slave6:52445 (size: 1717.4 KB, free: 2.1 GB)
15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
1.0 (TID 5) in 1757 ms on HadoopV26Slave3 (1/4)
15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
1.0 (TID 6) in 1776 ms on HadoopV26Slave2 (2/4)
15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_0 in memory on
HadoopV26Slave5:45835 (size: 1733.1 KB, free: 2.1 GB)
15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
1.0 (TID 7) in 3153 ms on HadoopV26Slave6 (3/4)
15/05/24 16:27:38 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at
Clustering.java:117) finished in 3.258 s
15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
1.0 (TID 4) in 3256 ms on HadoopV26Slave5 (4/4)
15/05/24 16:27:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0,
whose tasks have all completed, from pool 
15/05/24 16:27:38 INFO scheduler.DAGScheduler: Job 0 finished:
saveAsTextFile at Clustering.java:117, took 3807.229501 s

although I can obtain the result ,but it's too slow, right?
The followings are also the final result info.
http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/result.png 
http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapvalueres.png
 

PS: if I reduce the size the input to just 10 records, it performs very
fast. But it doesn't make any sense for just 10 records. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-dramatically-slow-when-I-add-saveAsTextFile-tp23003.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark dramatically slow when I add saveAsTextFile

2015-05-24 Thread Joe Wass
] at saveAsTextFile at Clustering.java:117)
 15/05/24 16:27:35 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0
 with
 4 tasks
 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
 1.0 (TID 4, HadoopV26Slave5, PROCESS_LOCAL, 1114 bytes)
 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
 1.0 (TID 5, HadoopV26Slave3, PROCESS_LOCAL, 1114 bytes)
 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
 1.0 (TID 6, HadoopV26Slave2, PROCESS_LOCAL, 1114 bytes)
 15/05/24 16:27:35 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
 1.0 (TID 7, HadoopV26Slave6, PROCESS_LOCAL, 1114 bytes)
 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
 in
 memory on HadoopV26Slave2:44597 (size: 57.0 KB, free: 1657.9 MB)
 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
 in
 memory on HadoopV26Slave3:38931 (size: 57.0 KB, free: 1657.9 MB)
 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to sparkExecutor@HadoopV26Slave2:33439
 15/05/24 16:27:35 INFO spark.MapOutputTrackerMaster: Size of output
 statuses
 for shuffle 0 is 188 bytes
 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to sparkExecutor@HadoopV26Slave3:52152
 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
 in
 memory on HadoopV26Slave6:52445 (size: 57.0 KB, free: 2.1 GB)
 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added broadcast_2_piece0
 in
 memory on HadoopV26Slave5:45835 (size: 57.0 KB, free: 2.1 GB)
 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to sparkExecutor@HadoopV26Slave6:37211
 15/05/24 16:27:35 INFO spark.MapOutputTrackerMasterActor: Asked to send map
 output locations for shuffle 0 to sparkExecutor@HadoopV26Slave5:43757
 15/05/24 16:27:35 INFO storage.BlockManagerInfo: Added rdd_4_1 in memory on
 HadoopV26Slave3:38931 (size: 1733.1 KB, free: 1656.2 MB)
 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_2 in memory on
 HadoopV26Slave2:44597 (size: 1733.1 KB, free: 1656.2 MB)
 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_3 in memory on
 HadoopV26Slave6:52445 (size: 1717.4 KB, free: 2.1 GB)
 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
 1.0 (TID 5) in 1757 ms on HadoopV26Slave3 (1/4)
 15/05/24 16:27:36 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
 1.0 (TID 6) in 1776 ms on HadoopV26Slave2 (2/4)
 15/05/24 16:27:36 INFO storage.BlockManagerInfo: Added rdd_4_0 in memory on
 HadoopV26Slave5:45835 (size: 1733.1 KB, free: 2.1 GB)
 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
 1.0 (TID 7) in 3153 ms on HadoopV26Slave6 (3/4)
 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Stage 1 (saveAsTextFile at
 Clustering.java:117) finished in 3.258 s
 15/05/24 16:27:38 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
 1.0 (TID 4) in 3256 ms on HadoopV26Slave5 (4/4)
 15/05/24 16:27:38 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0,
 whose tasks have all completed, from pool
 15/05/24 16:27:38 INFO scheduler.DAGScheduler: Job 0 finished:
 saveAsTextFile at Clustering.java:117, took 3807.229501 s

 although I can obtain the result ,but it's too slow, right?
 The followings are also the final result info.
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/result.png
 
 
 http://apache-spark-user-list.1001560.n3.nabble.com/file/n23003/mapvalueres.png
 

 PS: if I reduce the size the input to just 10 records, it performs very
 fast. But it doesn't make any sense for just 10 records.




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-dramatically-slow-when-I-add-saveAsTextFile-tp23003.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Join on Spark too slow.

2015-04-09 Thread Guillaume Pitel
Maybe I'm wrong, but what you are doing here is basically a bunch of 
cartesian product for each key. So if hello appear 100 times in your 
corpus, it will produce 100*100 elements in the join output.


I don't understand what you're doing here, but it's normal your join 
takes forever, it makes no sense as it, IMO.


Guillaume

Hello guys,

I am trying to run the following dummy example for Spark,
on a dataset of 250MB, using 5 machines with 10GB RAM
each, but the join seems to be taking too long ( 2hrs).

I am using Spark 0.8.0 but I have also tried the same example
on more recent versions, with the same results.

Do you have any idea why this is happening?

Thanks a lot,
Kostas
**
*val *sc = *new *SparkContext(
args(0),
*DummyJoin*,
System./getenv/(*SPARK_HOME*),
/Seq/(System./getenv/(*SPARK_EXAMPLES_JAR*)))

*val *file = sc.textFile(args(1))

*val *wordTuples = file
.flatMap(line = line.split(args(2)))
.map(word = (word, 1))

*val *big = wordTuples.filter {
*case *((k, v)) = k != *a
*}.cache()

*val *small = wordTuples.filter {
*case *((k, v)) = k != *a * k != *to * k != *and
*}.cache()

*val *res = big.leftOuterJoin(small)
res.saveAsTextFile(args(3))
}



--
eXenSa


*Guillaume PITEL, Président*
+33(0)626 222 431

eXenSa S.A.S. http://www.exensa.com/
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)184 163 677 / Fax +33(0)972 283 705



Join on Spark too slow.

2015-04-09 Thread Kostas Kloudas
Hello guys,

I am trying to run the following dummy example for Spark,
on a dataset of 250MB, using 5 machines with 10GB RAM
each, but the join seems to be taking too long ( 2hrs).

I am using Spark 0.8.0 but I have also tried the same example
on more recent versions, with the same results.

Do you have any idea why this is happening?

Thanks a lot,
Kostas

*val *sc = *new *SparkContext(
  args(0),
  *DummyJoin*,
  System.*getenv*(*SPARK_HOME*),
  *Seq*(System.*getenv*(*SPARK_EXAMPLES_JAR*)))

*val *file = sc.textFile(args(1))

*val *wordTuples = file
  .flatMap(line = line.split(args(2)))
  .map(word = (word, 1))

*val *big = wordTuples.filter {
  *case *((k, v)) = k !=
*a*}.cache()

*val *small = wordTuples.filter {
  *case *((k, v)) = k != *a * k != *to * k !=
*and*}.cache()

*val *res = big.leftOuterJoin(small)
res.saveAsTextFile(args(3))
  }


Re: Join on Spark too slow.

2015-04-09 Thread ๏̯͡๏
If your data has special characteristics like one small other large then
you can think of doing map side join in Spark using (Broadcast Values),
this will speed up things.

Otherwise as Pitel mentioned if there is nothing special and its just
cartesian product it might take ever, or you might increase # of executors.

On Thu, Apr 9, 2015 at 8:37 PM, Guillaume Pitel guillaume.pi...@exensa.com
wrote:

  Maybe I'm wrong, but what you are doing here is basically a bunch of
 cartesian product for each key. So if hello appear 100 times in your
 corpus, it will produce 100*100 elements in the join output.

 I don't understand what you're doing here, but it's normal your join takes
 forever, it makes no sense as it, IMO.

 Guillaume

 Hello guys,

  I am trying to run the following dummy example for Spark,
 on a dataset of 250MB, using 5 machines with 10GB RAM
 each, but the join seems to be taking too long ( 2hrs).

  I am using Spark 0.8.0 but I have also tried the same example
 on more recent versions, with the same results.

  Do you have any idea why this is happening?

  Thanks a lot,
 Kostas

  *val *sc = *new *SparkContext(
   args(0),
   *DummyJoin*,
   System.*getenv*(*SPARK_HOME*),
   *Seq*(System.*getenv*(*SPARK_EXAMPLES_JAR*)))

 *val *file = sc.textFile(args(1))

 *val *wordTuples = file
   .flatMap(line = line.split(args(2)))
   .map(word = (word, 1))

 *val *big = wordTuples.filter {
   *case *((k, v)) = k !=
 *a *}.cache()

 *val *small = wordTuples.filter {
   *case *((k, v)) = k != *a * k != *to * k !=
 *and *}.cache()

 *val *res = big.leftOuterJoin(small)
 res.saveAsTextFile(args(3))
   }



 --
[image: eXenSa]
  *Guillaume PITEL, Président*
 +33(0)626 222 431

 eXenSa S.A.S. http://www.exensa.com/
  41, rue Périer - 92120 Montrouge - FRANCE
 Tel +33(0)184 163 677 / Fax +33(0)972 283 705




-- 
Deepak


Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-22 Thread TJ Klein
Seems like it is a bug rather than a feature.
I filed a bug report: https://issues.apache.org/jira/browse/SPARK-5363



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21317.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Davies Liu
We have not meet this issue, so not sure there are bugs related to
reused worker or not.

Could provide more details about it?

On Wed, Jan 21, 2015 at 2:27 AM, critikaled isasmani@gmail.com wrote:
 I'm also facing the same issue.
 is this a bug?



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21283.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Tassilo Klein
I set spark.python.worker.reuse = false and now it seems to run longer than
before (it has not crashed yet). However, it is very very slow. How to
proceed?

On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote:

 Could you try to disable the new feature of reused worker by:
 spark.python.worker.reuse = false

 On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu
 wrote:
  Hi,
 
  It's a bit of a longer script that runs some deep learning training.
  Therefore it is a bit hard to wrap up easily.
 
  Essentially I am having a loop, in which a gradient is computed on each
 node
  and collected (this is where it freezes at some point).
 
   grads = zipped_trainData.map(distributed_gradient_computation).collect()
 
 
  The distributed_gradient_computation mainly contains a Theano derived
  function. The theano function itself is a broadcast variable.
 
  Let me know if you need more information.
 
  Best,
   Tassilo
 
  On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you provide a short script to reproduce this issue?
 
  On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
   Hi,
  
   I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
   PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
   faster
   than Spark 1.1. However, the initial joy faded quickly when I noticed
   that
   all my stuff didn't successfully terminate operations anymore. Using
   Spark
   1.1 it still works perfectly fine, though.
   Specifically, the execution just freezes without any error output at
 one
   point, when calling a joint map() and collect() statement (after
 having
   it
   called many times successfully before in a loop).
  
   Any clue? Or do I have to wait for the next version?
  
   Best,
Tassilo
  
  
  
   --
   View this message in context:
  
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
   Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 


 The information in this e-mail is intended only for the person to whom it
 is
 addressed. If you believe this e-mail was sent to you in error and the
 e-mail
 contains patient information, please contact the Partners Compliance
 HelpLine at
 http://www.partners.org/complianceline . If the e-mail was sent to you in
 error
 but does not contain patient information, please contact the sender and
 properly
 dispose of the e-mail.



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Davies Liu
Because that you have large broadcast, they need to be loaded into
Python worker for each tasks, if the worker is not reused.

We will really appreciate that if you could provide a short script to
reproduce the freeze, then we can investigate the root cause and fix
it. Also, fire a JIRA for it, thanks!

On Wed, Jan 21, 2015 at 4:56 PM, Tassilo Klein tjkl...@gmail.com wrote:
 I set spark.python.worker.reuse = false and now it seems to run longer than
 before (it has not crashed yet). However, it is very very slow. How to
 proceed?

 On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com wrote:

 Could you try to disable the new feature of reused worker by:
 spark.python.worker.reuse = false

 On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu
 wrote:
  Hi,
 
  It's a bit of a longer script that runs some deep learning training.
  Therefore it is a bit hard to wrap up easily.
 
  Essentially I am having a loop, in which a gradient is computed on each
  node
  and collected (this is where it freezes at some point).
 
   grads =
  zipped_trainData.map(distributed_gradient_computation).collect()
 
 
  The distributed_gradient_computation mainly contains a Theano derived
  function. The theano function itself is a broadcast variable.
 
  Let me know if you need more information.
 
  Best,
   Tassilo
 
  On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com
  wrote:
 
  Could you provide a short script to reproduce this issue?
 
  On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
   Hi,
  
   I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
   PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
   faster
   than Spark 1.1. However, the initial joy faded quickly when I noticed
   that
   all my stuff didn't successfully terminate operations anymore. Using
   Spark
   1.1 it still works perfectly fine, though.
   Specifically, the execution just freezes without any error output at
   one
   point, when calling a joint map() and collect() statement (after
   having
   it
   called many times successfully before in a loop).
  
   Any clue? Or do I have to wait for the next version?
  
   Best,
Tassilo
  
  
  
   --
   View this message in context:
  
   http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
   Sent from the Apache Spark User List mailing list archive at
   Nabble.com.
  
   -
   To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
   For additional commands, e-mail: user-h...@spark.apache.org
  
 
 


 The information in this e-mail is intended only for the person to whom it
 is
 addressed. If you believe this e-mail was sent to you in error and the
 e-mail
 contains patient information, please contact the Partners Compliance
 HelpLine at
 http://www.partners.org/complianceline . If the e-mail was sent to you in
 error
 but does not contain patient information, please contact the sender and
 properly
 dispose of the e-mail.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread Tassilo Klein
What do you suggest? Should I send you the script so you can run it
yourself?
 Yes, my broadcast variables are fairly large (1.7 MBytes).

On Wed, Jan 21, 2015 at 8:20 PM, Davies Liu dav...@databricks.com wrote:

 Because that you have large broadcast, they need to be loaded into
 Python worker for each tasks, if the worker is not reused.

 We will really appreciate that if you could provide a short script to
 reproduce the freeze, then we can investigate the root cause and fix
 it. Also, fire a JIRA for it, thanks!

 On Wed, Jan 21, 2015 at 4:56 PM, Tassilo Klein tjkl...@gmail.com wrote:
  I set spark.python.worker.reuse = false and now it seems to run longer
 than
  before (it has not crashed yet). However, it is very very slow. How to
  proceed?
 
  On Wed, Jan 21, 2015 at 2:21 AM, Davies Liu dav...@databricks.com
 wrote:
 
  Could you try to disable the new feature of reused worker by:
  spark.python.worker.reuse = false
 
  On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein 
 tjkl...@bwh.harvard.edu
  wrote:
   Hi,
  
   It's a bit of a longer script that runs some deep learning training.
   Therefore it is a bit hard to wrap up easily.
  
   Essentially I am having a loop, in which a gradient is computed on
 each
   node
   and collected (this is where it freezes at some point).
  
grads =
   zipped_trainData.map(distributed_gradient_computation).collect()
  
  
   The distributed_gradient_computation mainly contains a Theano derived
   function. The theano function itself is a broadcast variable.
  
   Let me know if you need more information.
  
   Best,
Tassilo
  
   On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com
   wrote:
  
   Could you provide a short script to reproduce this issue?
  
   On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
Hi,
   
I just recently tried to migrate from Spark 1.1 to Spark 1.2 -
 using
PySpark. Initially, I was super glad, noticing that Spark 1.2 is
 way
faster
than Spark 1.1. However, the initial joy faded quickly when I
 noticed
that
all my stuff didn't successfully terminate operations anymore.
 Using
Spark
1.1 it still works perfectly fine, though.
Specifically, the execution just freezes without any error output
 at
one
point, when calling a joint map() and collect() statement (after
having
it
called many times successfully before in a loop).
   
Any clue? Or do I have to wait for the next version?
   
Best,
 Tassilo
   
   
   
--
View this message in context:
   
   
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.
   
   
 -
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
   
  
  
 
 
  The information in this e-mail is intended only for the person to whom
 it
  is
  addressed. If you believe this e-mail was sent to you in error and the
  e-mail
  contains patient information, please contact the Partners Compliance
  HelpLine at
  http://www.partners.org/complianceline . If the e-mail was sent to you
 in
  error
  but does not contain patient information, please contact the sender and
  properly
  dispose of the e-mail.
 
 



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-21 Thread critikaled
I'm also facing the same issue.
is this a bug?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278p21283.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread TJ Klein
Hi,

I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster
than Spark 1.1. However, the initial joy faded quickly when I noticed that
all my stuff didn't successfully terminate operations anymore. Using Spark
1.1 it still works perfectly fine, though. 
Specifically, the execution just freezes without any error output at one
point, when calling a joint map() and collect() statement (after having it
called many times successfully before in a loop).

Any clue? Or do I have to wait for the next version?

Best,
 Tassilo



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread Davies Liu
Could you provide a short script to reproduce this issue?

On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
 Hi,

 I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
 PySpark. Initially, I was super glad, noticing that Spark 1.2 is way faster
 than Spark 1.1. However, the initial joy faded quickly when I noticed that
 all my stuff didn't successfully terminate operations anymore. Using Spark
 1.1 it still works perfectly fine, though.
 Specifically, the execution just freezes without any error output at one
 point, when calling a joint map() and collect() statement (after having it
 called many times successfully before in a loop).

 Any clue? Or do I have to wait for the next version?

 Best,
  Tassilo



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread Tassilo Klein
Hi,

It's a bit of a longer script that runs some deep learning training.
Therefore it is a bit hard to wrap up easily.

Essentially I am having a loop, in which a gradient is computed on each
node and collected (this is where it freezes at some point).

 grads = zipped_trainData.map(distributed_gradient_computation).collect()


The distributed_gradient_computation mainly contains a Theano derived
function. The theano function itself is a broadcast variable.

Let me know if you need more information.

Best,
 Tassilo

On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com wrote:

 Could you provide a short script to reproduce this issue?

 On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
  Hi,
 
  I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
  PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
 faster
  than Spark 1.1. However, the initial joy faded quickly when I noticed
 that
  all my stuff didn't successfully terminate operations anymore. Using
 Spark
  1.1 it still works perfectly fine, though.
  Specifically, the execution just freezes without any error output at one
  point, when calling a joint map() and collect() statement (after having
 it
  called many times successfully before in a loop).
 
  Any clue? Or do I have to wait for the next version?
 
  Best,
   Tassilo
 
 
 
  --
  View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



Re: Spark 1.1 (slow, working), Spark 1.2 (fast, freezing)

2015-01-20 Thread Davies Liu
Could you try to disable the new feature of reused worker by:
spark.python.worker.reuse = false

On Tue, Jan 20, 2015 at 11:12 PM, Tassilo Klein tjkl...@bwh.harvard.edu wrote:
 Hi,

 It's a bit of a longer script that runs some deep learning training.
 Therefore it is a bit hard to wrap up easily.

 Essentially I am having a loop, in which a gradient is computed on each node
 and collected (this is where it freezes at some point).

  grads = zipped_trainData.map(distributed_gradient_computation).collect()


 The distributed_gradient_computation mainly contains a Theano derived
 function. The theano function itself is a broadcast variable.

 Let me know if you need more information.

 Best,
  Tassilo

 On Wed, Jan 21, 2015 at 1:17 AM, Davies Liu dav...@databricks.com wrote:

 Could you provide a short script to reproduce this issue?

 On Tue, Jan 20, 2015 at 9:00 PM, TJ Klein tjkl...@gmail.com wrote:
  Hi,
 
  I just recently tried to migrate from Spark 1.1 to Spark 1.2 - using
  PySpark. Initially, I was super glad, noticing that Spark 1.2 is way
  faster
  than Spark 1.1. However, the initial joy faded quickly when I noticed
  that
  all my stuff didn't successfully terminate operations anymore. Using
  Spark
  1.1 it still works perfectly fine, though.
  Specifically, the execution just freezes without any error output at one
  point, when calling a joint map() and collect() statement (after having
  it
  called many times successfully before in a loop).
 
  Any clue? Or do I have to wait for the next version?
 
  Best,
   Tassilo
 
 
 
  --
  View this message in context:
  http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-1-slow-working-Spark-1-2-fast-freezing-tp21278.html
  Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark run slow after unexpected repartition

2014-09-30 Thread matthes
I have the same problem! I start the same job 3 or 4 times again, it depends
how big the data and the cluster are. The runtime goes down in the following
jobs. And at the end I get the Fetch failure error and at this point I must
restart the spark shell and everything works well again. And I don't use the
caching option!

By the way, I have the same behavior with different jobs!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-run-slow-after-unexpected-repartition-tp14542p15416.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark run slow after unexpected repartition

2014-09-18 Thread Tan Tim
I also encountered the similar problem: after some stages, all the taskes
are assigned to one machine, and the stage execution get slower and slower.

*[the spark conf setting]*
val conf = new SparkConf().setMaster(sparkMaster).setAppName(ModelTraining
).setSparkHome(sparkHome).setJars(List(jarFile))
conf.set(spark.serializer, org.apache.spark.serializer.KryoSerializer)
conf.set(spark.kryo.registrator, LRRegistrator)
conf.set(spark.storage.memoryFraction, 0.7)
conf.set(spark.executor.memory, 8g)
conf.set(spark.cores.max, 150)
conf.set(spark.speculation, true)
conf.set(spark.storage.blockManagerHeartBeatMs, 30)

val sc = new SparkContext(conf)
val lines = sc.textFile(hdfs://xxx:52310+inputPath , 3)
val trainset = lines.map(parseWeightedPoint).repartition(50
).persist(StorageLevel.MEMORY_ONLY)

*[the warn log from the spark]*
14/09/19 10:26:23 WARN TaskSetManager: Loss was due to fetch failure from
BlockManagerId(45, TS-BH109, 48384, 0)
14/09/19 10:27:18 WARN TaskSetManager: Lost TID 726 (task 14.0:9)
14/09/19 10:29:03 WARN SparkDeploySchedulerBackend: Ignored task status
update (737 state FAILED) from unknown executor
Actor[akka.tcp://sparkExecutor@TS-BH96:33178/user/Executor#-913985102] with
ID 39
14/09/19 10:29:03 WARN TaskSetManager: Loss was due to fetch failure from
BlockManagerId(30, TS-BH136, 28518, 0)
14/09/19 11:01:22 WARN BlockManagerMasterActor: Removing BlockManager
BlockManagerId(47, TS-BH136, 31644, 0) with no recent heart beats: 47765ms
exceeds 45000ms

Any suggestions?

On Thu, Sep 18, 2014 at 4:46 PM, shishu shi...@zamplus.com wrote:

  Hi dear all~

 My spark application sometimes runs much slower than it use to be, so I
 wonder why would this happen.

 I find out that after a repartition stage of stage 17, all tasks go to one
 executor. But in my code, I only use repartition at the very beginning.

 In my application, before stage 17, every stage run sucessfully within 1
 minute, but after stage 17, it cost more than 10 minutes for every stage.
 Normally my application runs succcessfully and will finish within 9 minites.

 My spark version is 0.9.1, and my program is writen by scala.



 I take some screen-shots, you can see it in the archive.



 Great thanks if you can help~



 Shi Shu




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark running slow for small hadoop files of 10 mb size

2014-04-24 Thread neeravsalaria
Thanks for the reply. It indeed increased the usage. There was another issue
we found, we were broadcasting hadoop configuration by writing a wrapper
class over it. But found the proper way in Spark Code 

sc.broadcast(new SerializableWritable(conf))





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-running-slow-for-small-hadoop-files-of-10-mb-size-tp4526p4811.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark running slow for small hadoop files of 10 mb size

2014-04-22 Thread Andre Bois-Crettez

The data partitionning is done by default *according to the number of
HDFS blocks* of the source.
You can change the partitionning with .repartion, either to increase or
decrease the level of parallelism :

val recordsRDD =
SparkContext.sequenceFile[NullWritable,BytesWritable](FilePath,256)
val recordsRDDInParallel = recordsRDD.repartition(4*32)
infoRdd = recordsRDDInParallel.map(f = info_func()) hdfs_RDD =
infoRDD.reduceByKey(_+_,48) /* makes 48 paritions*/
hdfs_RDD.saveAsNewAPIHadoopFile



André
On 2014-04-21 13:21, neeravsalaria wrote:

Hi,

   i have been using MapReduce to analyze multiple files whose size can range
from 10 mb to 200mb per file. recently i  planned to move spark , but my
spark Job is taking too much time executing a single file in case my file
size is 10MB and hdfs block size is 64MB. It is executing on a single
datanode and on single core(my cluster is a 4 node setup / each node having
32 cores). each file is having 3 million rows and i have to analyze each
row(ignore none) and create a set of info from it.

Isn't a way where i can parallelize the processing of the file like either
on other nodes or use the remaining cores of the same node.



demo code :

  val recordsRDD =
SparkContext.sequenceFile[NullWritable,BytesWritable](FilePath,256) /*to
parallelize */

  infoRdd = recordsRDD.map(f = info_func())

  hdfs_RDD = infoRDD.reduceByKey(_+_,48)  /* makes 48 paritions*/

 hdfs_RDD.saveAsNewAPIHadoopFile



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-running-slow-for-small-hadoop-files-of-10-mb-size-tp4526.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--
André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re: Spark is slow

2014-04-22 Thread Nicholas Chammas
How long are the count() steps taking? And how many partitions are pairs1and
triples initially divided into? You can see this by doing
pairs1._jrdd.splits().size(), for example.

If you just need to count the number of distinct keys, is it faster if you
did the following instead of groupByKey().count()?

g1 = pairs1.map(lambda (k,v): k).distinct().count()
g2 = triples.map(lambda (k,v): k).distinct().count()

Nick


On Mon, Apr 21, 2014 at 10:42 PM, Joe L selme...@yahoo.com wrote:

 g1 = pairs1.groupByKey().count()
 pairs1 = pairs1.groupByKey(g1).cache()
 g2 = triples.groupByKey().count()
 pairs2 = pairs2.groupByKey(g2)

 pairs = pairs2.join(pairs1)

 Hi, I want to implement hash-partitioned joining as shown above. But
 somehow, it is taking so long to perform. As I understand, the above
 joining
 is only implemented locally right since they are partitioned respectively?
 After we partition, they will reside in the same node. So, isn't it
 supposed
 to be fast when we partition by keys. Thank you.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-slow-tp4539p4577.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark is slow

2014-04-21 Thread Marcelo Vanzin
Hi Joe,

On Mon, Apr 21, 2014 at 11:23 AM, Joe L selme...@yahoo.com wrote:
 And, I  haven't gotten any answers to my questions.

One thing that might explain that is that, at least for me, all (and I
mean *all*) of your messages are ending up in my GMail spam folder,
complaining that GMail can't verify that it really comes from
yahoo.com.

No idea why that's happening or how to fix it.

-- 
Marcelo


Re: Spark is slow

2014-04-21 Thread John Meagher
Yahoo made some changes that drive mailing list posts into spam
folders:  http://www.virusbtn.com/blog/2014/04_15.xml

On Mon, Apr 21, 2014 at 2:50 PM, Marcelo Vanzin van...@cloudera.com wrote:
 Hi Joe,

 On Mon, Apr 21, 2014 at 11:23 AM, Joe L selme...@yahoo.com wrote:
 And, I  haven't gotten any answers to my questions.

 One thing that might explain that is that, at least for me, all (and I
 mean *all*) of your messages are ending up in my GMail spam folder,
 complaining that GMail can't verify that it really comes from
 yahoo.com.

 No idea why that's happening or how to fix it.

 --
 Marcelo


Re: Spark is slow

2014-04-21 Thread Nicholas Chammas
I'm seeing the same thing as Marcelo, Joe. All your mail is going to my
Spam folder. :(

With regards to your questions, I would suggest in general adding some more
technical detail to them. It will be difficult for people to give you
suggestions if all they are told is Spark is slow. How does your Spark
setup differ from your MR/Hive setup? What operations are you doing? What
do you see in the Spark UI? What have you tried doing to isolate or
identify the reason for the slowness? Etc.

Nick


On Mon, Apr 21, 2014 at 2:50 PM, Marcelo Vanzin van...@cloudera.com wrote:

 Hi Joe,

 On Mon, Apr 21, 2014 at 11:23 AM, Joe L selme...@yahoo.com wrote:
  And, I  haven't gotten any answers to my questions.

 One thing that might explain that is that, at least for me, all (and I
 mean *all*) of your messages are ending up in my GMail spam folder,
 complaining that GMail can't verify that it really comes from
 yahoo.com.

 No idea why that's happening or how to fix it.

 --
 Marcelo



Re: Spark is slow

2014-04-21 Thread Sam Bessalah
Why don't start by explaining what kind of operation you're running on
spark that's faster than hadoop mapred. Mybewe could start there. And yes
this mailing is very busy since many people are getting into Spark, it's
hard to answer to everyone.
On 21 Apr 2014 20:23, Joe L selme...@yahoo.com wrote:

 It is claimed that spark is 10x or 100x times faster than mapreduce and
 hive
 but since I started using it I haven't seen any faster performance. it is
 taking 2 minutes to run map and join tasks over just 2GB data. Instead hive
 was taking just a few seconds to join 2 tables over the same data. And, I
 haven't gotten any answers to my questions. I don't understand the purpose
 of this group and there is no enough documentations of spark and its usage.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-slow-tp4539.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Spark is slow

2014-04-21 Thread Joe L
g1 = pairs1.groupByKey().count() 
pairs1 = pairs1.groupByKey(g1).cache() 
g2 = triples.groupByKey().count() 
pairs2 = pairs2.groupByKey(g2) 

pairs = pairs2.join(pairs1) 

Hi, I want to implement hash-partitioned joining as shown above. But
somehow, it is taking so long to perform. As I understand, the above joining
is only implemented locally right since they are partitioned respectively?
After we partition, they will reside in the same node. So, isn't it supposed
to be fast when we partition by keys. Thank you. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-slow-tp4539p4577.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.