understand, thank you small file is a problem, I am considering process data before put them in hdfs.
On Tue, Aug 12, 2014 at 9:37 PM, Fengyun RAO <raofeng...@gmail.com> wrote: > 1. be careful, HDFS are better for large files, not bunches of small files. > > 2. if that's really what you want, roll it your own. > > def writeLines(iterator: Iterator[(String, String)]) = { > val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) > map > try { > while (iterator.hasNext) { > val item = iterator.next() > val key = item._1 > val line = item._2 > val writer = writers.get(key) match { > case Some(writer) => writer > case None => > val path = arg(1) + key > val outputStream = FileSystem.get(new Configuration()).create(new > Path(path)) > writer = new BufferedWriter(outputStream) > } > writer.writeLine(line) > } finally { > writers.values.foreach(._close()) > } > } > > val inputData = sc.textFile() > val keyValue = inputData.map(line => (key, line)) > val partitions = keValue.partitionBy(new MyPartition(10)) > partitions.foreachPartition(writeLines) > > > class MyPartitioner(partitions: Int) extends Partitioner { > override def numPartitions: Int = partitions > > override def getPartition(key: Any): Int = { > (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make > sure lines with the same key in the same partition > } > } > > > > 2014-08-12 21:34 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>: > > 1. be careful, HDFS are better for large files, not bunches of small files. >> >> 2. if that's really what you want, roll it your own. >> >> def writeAvro(iterator: Iterator[(String, String)]) = { >> val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) >> map >> try { >> while (iterator.hasNext) { >> val item = iterator.next() >> val key = item._1 >> val line = item._2 >> val writer = writers.get(key) match { >> case Some(writer) => writer >> case None => >> val path = arg(1) + key >> val outputStream = FileSystem.get(new Configuration()).create(new >> Path(path)) >> writer = new BufferedWriter(outputStream) >> } >> writer.writeLine(line) >> } finally { >> writers.values.foreach(._close()) >> } >> } >> >> val inputData = sc.textFile() >> val keyValue = inputData.map(line => (key, line)) >> val partitions = keValue.partitionBy(new MyPartition(10)) >> partitions.foreachPartition(writeLines) >> >> >> class MyPartitioner(partitions: Int) extends Partitioner { >> override def numPartitions: Int = partitions >> >> override def getPartition(key: Any): Int = { >> (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make >> sure lines with the same key in the same partition >> >> } >> } >> >> >> >> 2014-08-11 20:42 GMT+08:00 诺铁 <noty...@gmail.com>: >> >> hi, >>> >>> I have googled and find similar question without good answer, >>> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark >>> >>> in short, I would like to separate raw data and divide by some key, for >>> example, create date, and put the in directory named by date, so that I can >>> easily access portion of data later. >>> >>> for now I have to extract all keys and then filter by key and save to >>> file repeatly. are there any good way to do this? or maybe I shouldn't do >>> such thing? >>> >> >> >