Hi Andy You can try sc.wholeTextFiles() instead of sc.textFile()
Regards Sab On 24-Nov-2015 4:01 am, "Andy Davidson" <a...@santacruzintegration.com> wrote: > Hi Xiao and Sabarish > > Using the Stage tab on the UI. It turns out you can see how many > partitions there are. If I did nothing I would have 228155 partition. > (This confirms what Sabarish said). I tried coalesce(3). RDD.count() > fails. I though given I have 3 workers and 1/3 of the data would easily > fit into memory this would be a good choice. > > If I use coalesce(30) count works. How ever it still seems slow. It took > 2.42 min to read 4720 records. My total data set size is 34M. > > Any suggestions how to choose the number of partitions.? > > ('spark.executor.memory', '2G¹) ('spark.driver.memory', '2G') > > > The data was originally collected using spark stream. I noticed that the > number of default partitions == the number of files create on hdfs. I bet > each file is one spark streaming mini-batchI suspect if I concatenate > these into a small number of files things will run much faster. I suspect > I would not need to call coalesce() and that coalesce() is taking a lot of > time. Any suggestions how to choose the file number of files. > > Kind regards > > Andy > > > From: Xiao Li <gatorsm...@gmail.com> > Date: Monday, November 23, 2015 at 12:21 PM > To: Andrew Davidson <a...@santacruzintegration.com> > Cc: Sabarish Sasidharan <sabarish.sasidha...@manthan.com>, "user @spark" > <user@spark.apache.org> > Subject: Re: newbie : why are thousands of empty files being created on > HDFS? > > > >In your case, maybe you can try to call the function coalesce? > >Good luck, > > > >Xiao Li > > > >2015-11-23 12:15 GMT-08:00 Andy Davidson <a...@santacruzintegration.com>: > > > >Hi Sabarish > > > >I am but a simple padawan :-) I do not understand your answer. Why would > >Spark be creating so many empty partitions? My real problem is my > >application is very slow. I happened to notice thousands of empty files > >being created. I thought this is a hint to why my app is slow. > > > >My program calls sample( 0.01).filter(not null).saveAsTextFile(). This > >takes about 35 min, to scan 500,000 JSON strings and write 5000 to disk. > >The total data writing in 38M. > > > >The data is read from HDFS. My understanding is Spark can not know in > >advance how HDFS partitioned the data. Spark knows I have a master and 3 > >slaves machines. It knows how many works/executors are assigned to my > >Job. I would expect spark would be smart enough not create more > >partitions than I have worker machines? > > > >Also given I am not using any key/value operations like Join() or doing > >multiple scans I would assume my app would not benefit from partitioning. > > > > > >Kind regards > > > >Andy > > > > > >From: Sabarish Sasidharan <sabarish.sasidha...@manthan.com> > >Date: Saturday, November 21, 2015 at 7:20 PM > >To: Andrew Davidson <a...@santacruzintegration.com> > >Cc: "user @spark" <user@spark.apache.org> > >Subject: Re: newbie : why are thousands of empty files being created on > >HDFS? > > > > > > > >Those are empty partitions. I don't see the number of partitions > >specified in code. That then implies the default parallelism config is > >being used and is set to a very high number, the sum of empty + non empty > >files. > >Regards > >Sab > >On 21-Nov-2015 11:59 pm, "Andy Davidson" <a...@santacruzintegration.com> > >wrote: > > > >I start working on a very simple ETL pipeline for a POC. It reads a in a > >data set of tweets stored as JSON strings on in HDFS and randomly selects > >1% of the observations and writes them to HDFS. It seems to run very > >slowly. E.G. To write 4720 observations takes 1:06:46.577795. I > >Also noticed that RDD saveAsTextFile is creating thousands of empty > >files. > > > >I assume creating all these empty files must be slowing down the system. > >Any idea why this is happening? Do I have write a script to periodical > >remove empty files? > > > > > >Kind regards > > > >Andy > > > >tweetStrings = sc.textFile(inputDataURL) > > > > > >def removeEmptyLines(line) : > > if line: > > return True > > else : > > emptyLineCount.add(1); > > return False > > > >emptyLineCount = sc.accumulator(0) > >sample = (tweetStrings.filter(removeEmptyLines) > > .sample(withReplacement=False, fraction=0.01, seed=345678)) > > > > > >startTime = datetime.datetime.now() > >sample.saveAsTextFile(saveDataURL) > > > >endTime = datetime.datetime.now() > >print("elapsed time:%s" % (datetime.datetime.now() - startTime)) > > > > > >elapsed time:1:06:46.577795 > > > >Total number of empty files$ hadoop fs -du {saveDataURL} | grep '^0' | wc > >l223515 > >Total number of files with data$ hadoop fs -du {saveDataURL} | grep v > >'^0' | wc l4642 > > > >I randomly pick a part file. It¹s size is 9251 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >