Hi colleagues,
In Hadoop I have a lot of folders containing small files. Therefore I am
reading the content of all folders, union the small files and write the unioned
data into a single folder
containing one file. Afterwards I delete the small files and the according
folders.
I see two possible emerging problems on which I would like to get your opinion:
1. When reading all the files inside the folders into the master program,
I think it might appear, that there is such an amount of files that the master
program will run out of memory?
To prevent this I thought about checking the file size of the folders and only
read folders in as long as there is enough memory to handle the amount.
Do you think that this is a possible solution or is there a better solution to
handle this problem?
2. The other problem is: I am doing a UnionAll to merge all the content
of the files. In my opinion this will cause that the data needs to be brought
to a single master and then the data will be unioned there.
So there might be the same problem, that the application runs out of memory.
My proposed solution would also be to union only if the size does not exceed
the available memory. Any better solution?
For a better understanding you can have a look at my code at the bottom of the
mail.
Would be glad to hear from your experience as I would assume that this problem
should be a general one.
Thanks & Best, Alex
val sqlContext = new SQLContext(sc)
//get filesystem
val conf = new Configuration()
val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), conf)
//get relevant folders
val directoryStatus = fs.listStatus(new
Path("hdfs://sandbox.hortonworks.com/demo/parquet/staging/"))
val latestFolder = directoryStatus.maxBy(x => x.getModificationTime)
val toWorkFolders = directoryStatus.filter(x => x.getModificationTime <
latestFolder.getModificationTime)
//aggregate folder content
val parquetFiles = toWorkFolders.map(folder => {
sqlContext.read.parquet(folder.getPath.toString)
})
val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y))
mergedParquet.coalesce(1) //Assemble part-files into one partition
..write.mode(SaveMode.Append)
..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*)
..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/")