Hi Alexander, You may want to try the wholeTextFiles() method of SparkContext. Using that you could just do something like this:
sc.wholeTextFiles("hdfs://input_dir") > .saveAsSequenceFile("hdfs://output_dir") The wholeTextFiles returns a RDD of ((filename, content)). http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext You will not have to worry about managing memory as much with this approach. -sujit On Wed, Nov 4, 2015 at 2:12 AM, Alexander Lenz <a...@lenz.tk> wrote: > Hi colleagues, > > In Hadoop I have a lot of folders containing small files. Therefore I am > reading the content of all folders, union the small files and write the > unioned data into a single folder > containing one file. Afterwards I delete the small files and the according > folders. > > I see two possible emerging problems on which I would like to get your > opinion: > > 1. When reading all the files inside the folders into the master > program, I think it might appear, that there is such an amount of files > that the master program will run out of memory? > To prevent this I thought about checking the file size of the folders and > only read folders in as long as there is enough memory to handle the amount. > Do you think that this is a possible solution or is there a better > solution to handle this problem? > > 2. The other problem is: I am doing a UnionAll to merge all the > content of the files. In my opinion this will cause that the data needs to > be brought to a single master and then the data will be unioned there. > So there might be the same problem, that the application runs out of > memory. > My proposed solution would also be to union only if the size does not > exceed the available memory. Any better solution? > > For a better understanding you can have a look at my code at the bottom of > the mail. > Would be glad to hear from your experience as I would assume that this > problem should be a general one. > > Thanks & Best, Alex > > > > > val sqlContext = new SQLContext(sc) > > //get filesystem > val conf = new Configuration() > val fs = FileSystem.get(new URI("hdfs://sandbox.hortonworks.com/"), > conf) > > //get relevant folders > val directoryStatus = fs.listStatus(new Path("hdfs:// > sandbox.hortonworks.com/demo/parquet/staging/")) > val latestFolder = directoryStatus.maxBy(x => x.getModificationTime) > > val toWorkFolders = directoryStatus.filter(x => x.getModificationTime > < latestFolder.getModificationTime) > > //aggregate folder content > val parquetFiles = toWorkFolders.map(folder => { > sqlContext.read.parquet(folder.getPath.toString) > }) > > val mergedParquet = parquetFiles.reduce((x, y) => x.unionAll(y)) > > mergedParquet.coalesce(1) //Assemble part-files into one partition > ..write.mode(SaveMode.Append) > ..partitionBy(PARQUET_PARTITIONBY_COLUMNS :_*) > ..parquet("hdfs://sandbox.hortonworks.com/demo/parquet/consolidated/ > ") > >