I'm seeing similar slowness in saveAsTextFile(), but only in Python.

I'm sorting data in a dataframe, then transform it and get a RDD, and then
coalesce(1).saveAsTextFile().

I converted the Python to Scala and the run-times were similar, except for
the saveAsTextFile() stage.  The scala version was much faster.

When looking at the executor logs during that stage, I see the following
when running the Scala code:

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Getting 600
non-empty blocks out of 600 blocks

15/11/23 20:51:26 INFO storage.ShuffleBlockFetcherIterator: Started 184
remote fetches in 64 ms

15/11/23 20:51:30 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (0  time so far)

15/11/23 20:51:35 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (1  time so far)

15/11/23 20:51:40 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (2  times so far)

15/11/23 20:51:45 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (3  times so far)

15/11/23 20:51:50 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (4  times so far)

15/11/23 20:51:54 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (5  times so far)

15/11/23 20:51:59 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (6  times so far)

15/11/23 20:52:04 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (7  times so far)

15/11/23 20:52:09 INFO sort.UnsafeExternalSorter: Thread 57 spilling sort
data of 146.0 MB to disk (8  times so far)



When running the Python version during the saveAsTextFile() stage, I see:

15/11/23 21:04:03 INFO python.PythonRunner: Times: total = 16190, boot = 5,
init = 144, finish = 16041

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:03 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 82 ms

15/11/23 21:04:15 INFO python.PythonRunner: Times: total = 12180, boot =
-415, init = 447, finish = 12148

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:15 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 129 ms

15/11/23 21:04:27 INFO python.PythonRunner: Times: total = 11450, boot =
-372, init = 398, finish = 11424

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:27 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 70 ms

15/11/23 21:04:42 INFO python.PythonRunner: Times: total = 14480, boot =
-378, init = 403, finish = 14455

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:42 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 62 ms

15/11/23 21:04:54 INFO python.PythonRunner: Times: total = 11868, boot =
-366, init = 381, finish = 11853

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:04:54 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 59 ms

15/11/23 21:05:10 INFO python.PythonRunner: Times: total = 15375, boot =
-392, init = 403, finish = 15364

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Getting 300
non-empty blocks out of 300 blocks

15/11/23 21:05:10 INFO storage.ShuffleBlockFetcherIterator: Started 231
remote fetches in 48 ms


The python version is approximately 10 times slower than the Scala
version.  Any ideas why?


-Don

On Mon, Nov 23, 2015 at 4:31 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Xiao and Sabarish
>
> Using the Stage tab on the UI. It turns out you can see how many
> partitions there are. If I did nothing I would have 228155 partition.
> (This confirms what Sabarish said). I tried coalesce(3). RDD.count()
> fails. I though given I have 3 workers and 1/3 of the data would easily
> fit into memory this would be a good choice.
>
> If I use coalesce(30) count works. How ever it still seems slow. It took
> 2.42 min to read 4720 records. My total data set size is 34M.
>
> Any suggestions how to choose the number of partitions.?
>
>  ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G')
>
>
> The data was originally collected using spark stream. I noticed that the
> number of default partitions == the number of files create on hdfs. I bet
> each file is one spark streaming mini-batchI suspect if I concatenate
> these into a small number of files things will run much faster. I suspect
> I would not need to call coalesce() and that coalesce() is taking a lot of
> time. Any suggestions how to choose the file number of files.
>
> Kind regards
>
> Andy
>
>
> From:  Xiao Li <gatorsm...@gmail.com>
> Date:  Monday, November 23, 2015 at 12:21 PM
> To:  Andrew Davidson <a...@santacruzintegration.com>
> Cc:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>, "user @spark"
> <user@spark.apache.org>
> Subject:  Re: newbie : why are thousands of empty files being created on
> HDFS?
>
>
> >In your case, maybe you can try to call the function coalesce?
> >Good luck,
> >
> >Xiao Li
> >
> >2015-11-23 12:15 GMT-08:00 Andy Davidson <a...@santacruzintegration.com>:
> >
> >Hi Sabarish
> >
> >I am but a simple padawan :-) I do not understand your answer. Why would
> >Spark be creating so many empty partitions? My real problem is my
> >application is very slow. I happened to notice thousands of empty files
> >being created. I thought this is a hint to why my app is slow.
> >
> >My program calls sample( 0.01).filter(not null).saveAsTextFile(). This
> >takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk.
> >The total data writing in 38M.
> >
> >The data is read from HDFS. My understanding is Spark can not know in
> >advance how HDFS partitioned the data. Spark knows I have a master and 3
> >slaves machines. It knows how many works/executors are assigned to my
> >Job. I would expect spark would be smart enough not create more
> >partitions than I have worker machines?
> >
> >Also given I am not using any key/value operations like Join() or doing
> >multiple scans I would assume my app would not benefit from partitioning.
> >
> >
> >Kind regards
> >
> >Andy
> >
> >
> >From:  Sabarish Sasidharan <sabarish.sasidha...@manthan.com>
> >Date:  Saturday, November 21, 2015 at 7:20 PM
> >To:  Andrew Davidson <a...@santacruzintegration.com>
> >Cc:  "user @spark" <user@spark.apache.org>
> >Subject:  Re: newbie : why are thousands of empty files being created on
> >HDFS?
> >
> >
> >
> >Those are empty partitions. I don't see the number of partitions
> >specified in code. That then implies the default parallelism config is
> >being used and is set to a very high number, the sum of empty + non empty
> >files.
> >Regards
> >Sab
> >On 21-Nov-2015 11:59 pm, "Andy Davidson" <a...@santacruzintegration.com>
> >wrote:
> >
> >I start working on a very simple ETL pipeline for a POC. It reads a in a
> >data set of tweets stored as JSON strings on in HDFS and randomly selects
> >1% of the observations and writes them to HDFS. It seems to run very
> >slowly. E.G. To write 4720 observations takes 1:06:46.577795. I
> >Also noticed that RDD saveAsTextFile is creating thousands of empty
> >files.
> >
> >I assume creating all these empty files must be slowing down the system.
> >Any idea why this is happening? Do I have write a script to periodical
> >remove empty files?
> >
> >
> >Kind regards
> >
> >Andy
> >
> >tweetStrings = sc.textFile(inputDataURL)
> >
> >
> >def removeEmptyLines(line) :
> >    if line:
> >        return True
> >    else :
> >        emptyLineCount.add(1);
> >        return False
> >
> >emptyLineCount = sc.accumulator(0)
> >sample = (tweetStrings.filter(removeEmptyLines)
> >          .sample(withReplacement=False, fraction=0.01, seed=345678))
> >
> >
> >startTime = datetime.datetime.now()
> >sample.saveAsTextFile(saveDataURL)
> >
> >endTime = datetime.datetime.now()
> >print("elapsed time:%s" % (datetime.datetime.now() - startTime))
> >
> >
> >elapsed time:1:06:46.577795
> >
> >Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc
> >­l223515
> >Total number of files with data$ hadoop fs -du {saveDataURL} | grep ­v
> >'^0' | wc ­l4642
> >
> >I randomly pick a part file. It¹s size is 9251
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake <http://www.MailLaunder.com/>
800-733-2143

Reply via email to