1. be careful, HDFS are better for large files, not bunches of small files.
2. if that's really what you want, roll it your own. def writeLines(iterator: Iterator[(String, String)]) = { val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map try { while (iterator.hasNext) { val item = iterator.next() val key = item._1 val line = item._2 val writer = writers.get(key) match { case Some(writer) => writer case None => val path = arg(1) + key val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) writer = new BufferedWriter(outputStream) } writer.writeLine(line) } finally { writers.values.foreach(._close()) } } val inputData = sc.textFile() val keyValue = inputData.map(line => (key, line)) val partitions = keValue.partitionBy(new MyPartition(10)) partitions.foreachPartition(writeLines) class MyPartitioner(partitions: Int) extends Partitioner { override def numPartitions: Int = partitions override def getPartition(key: Any): Int = { (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make sure lines with the same key in the same partition } } 2014-08-12 21:34 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>: > 1. be careful, HDFS are better for large files, not bunches of small files. > > 2. if that's really what you want, roll it your own. > > def writeAvro(iterator: Iterator[(String, String)]) = { > val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) > map > try { > while (iterator.hasNext) { > val item = iterator.next() > val key = item._1 > val line = item._2 > val writer = writers.get(key) match { > case Some(writer) => writer > case None => > val path = arg(1) + key > val outputStream = FileSystem.get(new Configuration()).create(new > Path(path)) > writer = new BufferedWriter(outputStream) > } > writer.writeLine(line) > } finally { > writers.values.foreach(._close()) > } > } > > val inputData = sc.textFile() > val keyValue = inputData.map(line => (key, line)) > val partitions = keValue.partitionBy(new MyPartition(10)) > partitions.foreachPartition(writeLines) > > > class MyPartitioner(partitions: Int) extends Partitioner { > override def numPartitions: Int = partitions > > override def getPartition(key: Any): Int = { > (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make > sure lines with the same key in the same partition > > } > } > > > > 2014-08-11 20:42 GMT+08:00 诺铁 <noty...@gmail.com>: > > hi, >> >> I have googled and find similar question without good answer, >> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark >> >> in short, I would like to separate raw data and divide by some key, for >> example, create date, and put the in directory named by date, so that I can >> easily access portion of data later. >> >> for now I have to extract all keys and then filter by key and save to >> file repeatly. are there any good way to do this? or maybe I shouldn't do >> such thing? >> > >