Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-24 Thread Andy Davidson
Hi Sabarish


Thanks for the suggestion. I did not know about wholeTextFiles()

By the way once your suggestion about repartitioning was spot on!. My run
time for count() when from elapsed time:0:56:42.902407 to elapsed
time:0:00:03.215143 on a data set of about 34M of 4720 records.

Andy

From:  Sabarish Sasidharan 
Date:  Monday, November 23, 2015 at 7:57 PM
To:  Andrew Davidson 
Cc:  Xiao Li , "user @spark" 
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> 
> Hi Andy
> 
> You can try sc.wholeTextFiles() instead of sc.textFile()
> 
> Regards
> Sab
> 
> On 24-Nov-2015 4:01 am, "Andy Davidson"  wrote:
>> Hi Xiao and Sabarish
>> 
>> Using the Stage tab on the UI. It turns out you can see how many
>> partitions there are. If I did nothing I would have 228155 partition.
>> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
>> fails. I though given I have 3 workers and 1/3 of the data would easily
>> fit into memory this would be a good choice.
>> 
>> If I use coalesce(30) count works. How ever it still seems slow. It took
>> 2.42 min to read 4720 records. My total data set size is 34M.
>> 
>> Any suggestions how to choose the number of partitions.?
>> 
>>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>> 
>> 
>> The data was originally collected using spark stream. I noticed that the
>> number of default partitions == the number of files create on hdfs. I bet
>> each file is one spark streaming mini-batchI suspect if I concatenate
>> these into a small number of files things will run much faster. I suspect
>> I would not need to call coalesce() and that coalesce() is taking a lot of
>> time. Any suggestions how to choose the file number of files.
>> 
>> Kind regards
>> 
>> Andy
>> 
>> 
>> From:  Xiao Li 
>> Date:  Monday, November 23, 2015 at 12:21 PM
>> To:  Andrew Davidson 
>> Cc:  Sabarish Sasidharan , "user @spark"
>> 
>> Subject:  Re: newbie : why are thousands of empty files being created on
>> HDFS?
>> 
>> 
>>> >In your case, maybe you can try to call the function coalesce?
>>> >Good luck,
>>> >
>>> >Xiao Li
>>> >
>>> >2015-11-23 12:15 GMT-08:00 Andy Davidson :
>>> >
>>> >Hi Sabarish
>>> >
>>> >I am but a simple padawan :-) I do not understand your answer. Why would
>>> >Spark be creating so many empty partitions? My real problem is my
>>> >application is very slow. I happened to notice thousands of empty files
>>> >being created. I thought this is a hint to why my app is slow.
>>> >
>>> >My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
>>> >takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
>>> >The total data writing in 38M.
>>> >
>>> >The data is read from HDFS. My understanding is Spark can not know in
>>> >advance how HDFS partitioned the data. Spark knows I have a master and 3
>>> >slaves machines. It knows how many works/executors are assigned to my
>>> >Job. I would expect spark would be smart enough not create more
>>> >partitions than I have worker machines?
>>> >
>>> >Also given I am not using any key/value operations like Join() or doing
>>> >multiple scans I would assume my app would not benefit from partitioning.
>>> >
>>> >
>>> >Kind regards
>>> >
>>> >Andy
>>> >
>>> >
>>> >From:  Sabarish Sasidharan 
>>> >Date:  Saturday, November 21, 2015 at 7:20 PM
>>> >To:  Andrew Davidson 
>>> >Cc:  "user @spark" 
>>> >Subject:  Re: newbie : why are thousands of empty files being created on
>>> >HDFS?
>>> >
>>> >
>>> >
>>> >Those are empty partitions. I don't see the number of partitions
>>> >specified in code. That then implies the default parallelism config is
>>> >being used and is set to a very high number, the sum of empty + non empty
>>> >files.
>>> >Regards
>>> >Sab
>>> >On 21-Nov-2015 11:59 pm, "Andy Davidson" 
>>> >wrote:
>>> >
>>> >I start working on a very simple ETL pipeline for a POC. It reads a in a
>>> >data set of tweets stored as JSON strings on in HDFS and randomly selects
>>> >1% of the observations and writes them to HDFS. It seems to run very
>>> >slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
>>> >Also noticed that RDD saveAsTextFile is creating thousands of empty
>>> >files.
>>> >
>>> >I assume creating all these empty files must be slowing down the system.
>>> >Any idea why this is happening? Do I have write a script to periodical
>>> >remove empty files?
>>> >
>>> >
>>> >Kind regards
>>> >
>>> >Andy
>>> >
>>> >tweetStrings = sc.textFile(inputDataURL)
>>> >
>>> >
>>> >def removeEmptyLines(line) :
>>> >if line:
>>> >return True
>>> >else :
>>> 

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-24 Thread Andy Davidson
Hi Don

I went to a presentation given by Professor Ion Stoica. He mentioned that
Python was a little slower in general because of the type system. I do not
remember all of his comments. I think the context had to do with spark SQL
and data frames.

I wonder if the python issue is similar to the boxing/unboxing issue in
Java?

Andy


From:  Don Drake 
Date:  Monday, November 23, 2015 at 7:10 PM
To:  Andrew Davidson 
Cc:  Xiao Li , Sabarish Sasidharan
, "user @spark" 
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> I'm seeing similar slowness in saveAsTextFile(), but only in Python.
> 
> I'm sorting data in a dataframe, then transform it and get a RDD, and then
> coalesce(1).saveAsTextFile().
> 
> I converted the Python to Scala and the run-times were similar, except for the
> saveAsTextFile() stage.  The scala version was much faster.
> 
> When looking at the executor logs during that stage, I see the following when
> running the Scala code:
> 15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600
> non-empty blocks out of 600 blocks
> 
> 15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184 remote
> fetches in 64 ms
> 
> 15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (0  time so far)
> 
> 15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (1  time so far)
> 
> 15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (2  times so far)
> 
> 15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (3  times so far)
> 
> 15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (4  times so far)
> 
> 15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (5  times so far)
> 
> 15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (6  times so far)
> 
> 15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (7  times so far)
> 
> 15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort data
> of 146.0 MB to disk (8  times so far)
> 
>  
> 
> When running the Python version during the saveAsTextFile() stage, I see:
> 
> 15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5,
> init = 144, finish = 16041
> 
> 15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 82 ms
> 
> 15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot = -415,
> init = 447, finish = 12148
> 
> 15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 129 ms
> 
> 15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot = -372,
> init = 398, finish = 11424
> 
> 15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 70 ms
> 
> 15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot = -378,
> init = 403, finish = 14455
> 
> 15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 62 ms
> 
> 15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot = -366,
> init = 381, finish = 11853
> 
> 15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 59 ms
> 
> 15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot = -392,
> init = 403, finish = 15364
> 
> 15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300
> non-empty blocks out of 300 blocks
> 
> 15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231 remote
> fetches in 48 ms
> 
> 
> 
> The python version is approximately 10 times slower than the Scala version.
> Any ideas why?
> 
> 
> 
> -Don
> 
> 
> On Mon, Nov 23, 2015 at 4:31 PM, Andy Davidson 
> wrote:
>> Hi Xiao and Sabarish
>> 
>> Using the Stage tab on the UI. It turns out you can see how many
>> partitions there are. If I did nothing I would have 228155 partition.
>> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
>> fails. I though given I 

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Andy Davidson
Hi Sabarish

I am but a simple padawan :-) I do not understand your answer. Why would
Spark be creating so many empty partitions? My real problem is my
application is very slow. I happened to notice thousands of empty files
being created. I thought this is a hint to why my app is slow.

My program calls sample( 0.01).filter(not null).saveAsTextFile(). This takes
about 35 min, to scan 500,000 JSON strings and write 5000 to disk. The total
data writing in 38M.

The data is read from HDFS. My understanding is Spark can not know in
advance how HDFS partitioned the data. Spark knows I have a master and 3
slaves machines. It knows how many works/executors are assigned to my Job. I
would expect spark would be smart enough not create more partitions than I
have worker machines?

Also given I am not using any key/value operations like Join() or doing
multiple scans I would assume my app would not benefit from partitioning.


Kind regards

Andy


From:  Sabarish Sasidharan 
Date:  Saturday, November 21, 2015 at 7:20 PM
To:  Andrew Davidson 
Cc:  "user @spark" 
Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?

> 
> Those are empty partitions. I don't see the number of partitions specified in
> code. That then implies the default parallelism config is being used and is
> set to a very high number, the sum of empty + non empty files.
> 
> Regards
> Sab
> 
> On 21-Nov-2015 11:59 pm, "Andy Davidson" 
> wrote:
>> I start working on a very simple ETL pipeline for a POC. It reads a in a data
>> set of tweets stored as JSON strings on in HDFS and randomly selects 1% of
>> the observations and writes them to HDFS. It seems to run very slowly. E.G.
>> To write 4720 observations takes 1:06:46.577795. I
>> Also noticed that RDD saveAsTextFile is creating thousands of empty files.
>> 
>> I assume creating all these empty files must be slowing down the system. Any
>> idea why this is happening? Do I have write a script to periodical remove
>> empty files?
>> 
>> 
>> Kind regards
>> 
>> Andy
>> 
>> tweetStrings = sc.textFile(inputDataURL)
>> 
>> 
>> def removeEmptyLines(line) :
>> if line:
>> return True
>> else :
>> emptyLineCount.add(1);
>> return False
>> 
>> emptyLineCount = sc.accumulator(0)
>> sample = (tweetStrings.filter(removeEmptyLines)
>>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>> 
>> startTime = datetime.datetime.now()
>> sample.saveAsTextFile(saveDataURL)
>> 
>> endTime = datetime.datetime.now()
>> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>> 
>> elapsed time:1:06:46.577795
>> 
>> 
>> Total number of empty files
>> $ hadoop fs -du {saveDataURL} | grep '^0' | wc ­l
>> 223515
>> Total number of files with data
>> $ hadoop fs -du {saveDataURL} | grep ­v '^0' | wc ­l
>> 4642
>> 
>> 
>> 
>> I randomly pick a part file. It¹s size is 9251




Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Xiao Li
In your case, maybe you can try to call the function coalesce?

Good luck,

Xiao Li

2015-11-23 12:15 GMT-08:00 Andy Davidson :

> Hi Sabarish
>
> I am but a simple padawan :-) I do not understand your answer. Why would
> Spark be creating so many empty partitions? My real problem is my
> application is very slow. I happened to notice thousands of empty files
> being created. I thought this is a hint to why my app is slow.
>
> My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
> takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
> The total data writing in 38M.
>
> The data is read from HDFS. My understanding is Spark can not know in
> advance how HDFS partitioned the data. Spark knows I have a master and 3
> slaves machines. It knows how many works/executors are assigned to my Job.
> I would expect spark would be smart enough not create more partitions than
> I have worker machines?
>
> Also given I am not using any key/value operations like Join() or doing
> multiple scans I would assume my app would not benefit from partitioning.
>
>
> Kind regards
>
> Andy
>
>
> From: Sabarish Sasidharan 
> Date: Saturday, November 21, 2015 at 7:20 PM
> To: Andrew Davidson 
> Cc: "user @spark" 
> Subject: Re: newbie : why are thousands of empty files being created on
> HDFS?
>
> Those are empty partitions. I don't see the number of partitions specified
> in code. That then implies the default parallelism config is being used and
> is set to a very high number, the sum of empty + non empty files.
>
> Regards
> Sab
> On 21-Nov-2015 11:59 pm, "Andy Davidson" 
> wrote:
>
>> I start working on a very simple ETL pipeline for a POC. It reads a in a
>> data set of tweets stored as JSON strings on in HDFS and randomly selects
>> 1% of the observations and writes them to HDFS. It seems to run very
>> slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
>> Also noticed that RDD saveAsTextFile is creating thousands of empty
>> files.
>>
>> I assume creating all these empty files must be slowing down the system. Any
>> idea why this is happening? Do I have write a script to periodical remove
>> empty files?
>>
>>
>> Kind regards
>>
>> Andy
>>
>> tweetStrings = sc.textFile(inputDataURL)
>>
>> def removeEmptyLines(line) :
>> if line:
>> return True
>> else :
>> emptyLineCount.add(1);
>> return False
>>
>> emptyLineCount = sc.accumulator(0)
>> sample = (tweetStrings.filter(removeEmptyLines)
>>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>>
>> startTime = datetime.datetime.now()
>> sample.saveAsTextFile(saveDataURL)
>>
>> endTime = datetime.datetime.now()
>> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>>
>> elapsed time:1:06:46.577795
>>
>>
>>
>> *Total number of empty files*
>>
>> $ hadoop fs -du {saveDataURL} | grep '^0' | wc –l
>>
>> 223515
>>
>> *Total number of files with data*
>>
>> $ hadoop fs -du {saveDataURL} | grep –v '^0' | wc –l
>>
>> 4642
>>
>>
>> I randomly pick a part file. It’s size is 9251
>>
>>


Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Andy Davidson
Hi Xiao and Sabarish

Using the Stage tab on the UI. It turns out you can see how many
partitions there are. If I did nothing I would have 228155 partition.
(This confirms what Sabarish said). I tried coalesce(3). RDD.count()
fails. I though given I have 3 workers and 1/3 of the data would easily
fit into memory this would be a good choice.

If I use coalesce(30) count works. How ever it still seems slow. It took
2.42 min to read 4720 records. My total data set size is 34M.

Any suggestions how to choose the number of partitions.?

 ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')


The data was originally collected using spark stream. I noticed that the
number of default partitions == the number of files create on hdfs. I bet
each file is one spark streaming mini-batchI suspect if I concatenate
these into a small number of files things will run much faster. I suspect
I would not need to call coalesce() and that coalesce() is taking a lot of
time. Any suggestions how to choose the file number of files.

Kind regards

Andy


From:  Xiao Li 
Date:  Monday, November 23, 2015 at 12:21 PM
To:  Andrew Davidson 
Cc:  Sabarish Sasidharan , "user @spark"

Subject:  Re: newbie : why are thousands of empty files being created on
HDFS?


>In your case, maybe you can try to call the function coalesce?
>Good luck, 
>
>Xiao Li
>
>2015-11-23 12:15 GMT-08:00 Andy Davidson :
>
>Hi Sabarish
>
>I am but a simple padawan :-) I do not understand your answer. Why would
>Spark be creating so many empty partitions? My real problem is my
>application is very slow. I happened to notice thousands of empty files
>being created. I thought this is a hint to why my app is slow.
>
>My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
>takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
>The total data writing in 38M.
>
>The data is read from HDFS. My understanding is Spark can not know in
>advance how HDFS partitioned the data. Spark knows I have a master and 3
>slaves machines. It knows how many works/executors are assigned to my
>Job. I would expect spark would be smart enough not create more
>partitions than I have worker machines?
>
>Also given I am not using any key/value operations like Join() or doing
>multiple scans I would assume my app would not benefit from partitioning.
>
>
>Kind regards
>
>Andy
>
>
>From:  Sabarish Sasidharan 
>Date:  Saturday, November 21, 2015 at 7:20 PM
>To:  Andrew Davidson 
>Cc:  "user @spark" 
>Subject:  Re: newbie : why are thousands of empty files being created on
>HDFS?
>
>
>
>Those are empty partitions. I don't see the number of partitions
>specified in code. That then implies the default parallelism config is
>being used and is set to a very high number, the sum of empty + non empty
>files.
>Regards
>Sab
>On 21-Nov-2015 11:59 pm, "Andy Davidson" 
>wrote:
>
>I start working on a very simple ETL pipeline for a POC. It reads a in a
>data set of tweets stored as JSON strings on in HDFS and randomly selects
>1% of the observations and writes them to HDFS. It seems to run very
>slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
>Also noticed that RDD saveAsTextFile is creating thousands of empty
>files. 
>
>I assume creating all these empty files must be slowing down the system.
>Any idea why this is happening? Do I have write a script to periodical
>remove empty files?
>
>
>Kind regards
>
>Andy
>
>tweetStrings = sc.textFile(inputDataURL)
>
>
>def removeEmptyLines(line) :
>if line:
>return True
>else :
>emptyLineCount.add(1);
>return False
>
>emptyLineCount = sc.accumulator(0)
>sample = (tweetStrings.filter(removeEmptyLines)
>  .sample(withReplacement=False, fraction=0.01, seed=345678))
>
>
>startTime = datetime.datetime.now()
>sample.saveAsTextFile(saveDataURL)
>
>endTime = datetime.datetime.now()
>print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>
>
>elapsed time:1:06:46.577795
>
>Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc
>­l223515
>Total number of files with data$ hadoop fs -du {saveDataURL} | grep ­v
>'^0' | wc ­l4642
>
>I randomly pick a part file. It¹s size is 9251
>
>
>
>
>
>
>
>
>
>
>
>
>
>



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-23 Thread Don Drake
I'm seeing similar slowness in saveAsTextFile(), but only in Python.

I'm sorting data in a dataframe, then transform it and get a RDD, and then
coalesce(1).saveAsTextFile().

I converted the Python to Scala and the run-times were similar, except for
the saveAsTextFile() stage.  The scala version was much faster.

When looking at the executor logs during that stage, I see the following
when running the Scala code:

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600
non-empty blocks out of 600 blocks

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184
remote fetches in 64 ms

15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (0  time so far)

15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (1  time so far)

15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (2  times so far)

15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (3  times so far)

15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (4  times so far)

15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (5  times so far)

15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (6  times so far)

15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (7  times so far)

15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (8  times so far)



When running the Python version during the saveAsTextFile() stage, I see:

15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5,
init = 144, finish = 16041

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 82 ms

15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot =
-415, init = 447, finish = 12148

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 129 ms

15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot =
-372, init = 398, finish = 11424

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 70 ms

15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot =
-378, init = 403, finish = 14455

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 62 ms

15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot =
-366, init = 381, finish = 11853

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 59 ms

15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot =
-392, init = 403, finish = 15364

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 48 ms


The python version is approximately 10 times slower than the Scala
version.  Any ideas why?


-Don

On Mon, Nov 23, 2015 at 4:31 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Xiao and Sabarish
>
> Using the Stage tab on the UI. It turns out you can see how many
> partitions there are. If I did nothing I would have 228155 partition.
> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
> fails. I though given I have 3 workers and 1/3 of the data would easily
> fit into memory this would be a good choice.
>
> If I use coalesce(30) count works. How ever it still seems slow. It took
> 2.42 min to read 4720 records. My total data set size is 34M.
>
> Any suggestions how to choose the number of partitions.?
>
>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>
>
> The data was originally collected using spark stream. I noticed that the
> number of default partitions == the number of files create on hdfs. I bet
> each file is one spark streaming mini-batchI suspect if I concatenate
> these into a small number of files things will run much faster. I suspect
> I would not need to call coalesce() and that coalesce() is taking a lot of
> time. Any suggestions how to choose the file number of files.
>
> Kind regards
>
> Andy
>
>
> From:  Xiao Li 
> Date:  Monday, November 23, 2015 at 

Re: newbie : why are thousands of empty files being created on HDFS?

2015-11-21 Thread Sabarish Sasidharan
Those are empty partitions. I don't see the number of partitions specified
in code. That then implies the default parallelism config is being used and
is set to a very high number, the sum of empty + non empty files.

Regards
Sab
On 21-Nov-2015 11:59 pm, "Andy Davidson" 
wrote:

> I start working on a very simple ETL pipeline for a POC. It reads a in a
> data set of tweets stored as JSON strings on in HDFS and randomly selects
> 1% of the observations and writes them to HDFS. It seems to run very
> slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
> Also noticed that RDD saveAsTextFile is creating thousands of empty files.
>
> I assume creating all these empty files must be slowing down the system. Any
> idea why this is happening? Do I have write a script to periodical remove
> empty files?
>
>
> Kind regards
>
> Andy
>
> tweetStrings = sc.textFile(inputDataURL)
>
> def removeEmptyLines(line) :
> if line:
> return True
> else :
> emptyLineCount.add(1);
> return False
>
> emptyLineCount = sc.accumulator(0)
> sample = (tweetStrings.filter(removeEmptyLines)
>   .sample(withReplacement=False, fraction=0.01, seed=345678))
>
> startTime = datetime.datetime.now()
> sample.saveAsTextFile(saveDataURL)
>
> endTime = datetime.datetime.now()
> print("elapsed time:%s" % (datetime.datetime.now() - startTime))
>
> elapsed time:1:06:46.577795
>
>
>
> *Total number of empty files*
>
> $ hadoop fs -du {saveDataURL} | grep '^0' | wc –l
>
> 223515
>
> *Total number of files with data*
>
> $ hadoop fs -du {saveDataURL} | grep –v '^0' | wc –l
>
> 4642
>
>
> I randomly pick a part file. It’s size is 9251
>
>