understand, thank you
small file is a problem, I am considering process data before put them in
hdfs.


On Tue, Aug 12, 2014 at 9:37 PM, Fengyun RAO <raofeng...@gmail.com> wrote:

> 1. be careful, HDFS are better for large files, not bunches of small files.
>
> 2. if that's really what you want, roll it your own.
>
> def writeLines(iterator: Iterator[(String, String)]) = {
>   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
> map
>   try {
>   while (iterator.hasNext) {
>     val item = iterator.next()
>     val key = item._1
>     val line = item._2
>     val writer = writers.get(key) match {
>       case Some(writer) => writer
>       case None =>
>         val path = arg(1) + key
>         val outputStream = FileSystem.get(new Configuration()).create(new 
> Path(path))
>         writer = new BufferedWriter(outputStream)
>     }
>     writer.writeLine(line)
>     } finally {
>     writers.values.foreach(._close())
>     }
> }
>
> val inputData = sc.textFile()
> val keyValue = inputData.map(line => (key, line))
> val partitions = keValue.partitionBy(new MyPartition(10))
> partitions.foreachPartition(writeLines)
>
>
> class MyPartitioner(partitions: Int) extends Partitioner {
>     override def numPartitions: Int = partitions
>
>     override def getPartition(key: Any): Int = {
>         (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make 
> sure lines with the same key in the same partition
>     }
> }
>
>
>
> 2014-08-12 21:34 GMT+08:00 Fengyun RAO <raofeng...@gmail.com>:
>
> 1. be careful, HDFS are better for large files, not bunches of small files.
>>
>> 2. if that's really what you want, roll it your own.
>>
>> def writeAvro(iterator: Iterator[(String, String)]) = {
>>   val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) 
>> map
>>   try {
>>   while (iterator.hasNext) {
>>     val item = iterator.next()
>>     val key = item._1
>>     val line = item._2
>>     val writer = writers.get(key) match {
>>       case Some(writer) => writer
>>       case None =>
>>         val path = arg(1) + key
>>         val outputStream = FileSystem.get(new Configuration()).create(new 
>> Path(path))
>>         writer = new BufferedWriter(outputStream)
>>     }
>>     writer.writeLine(line)
>>     } finally {
>>     writers.values.foreach(._close())
>>     }
>> }
>>
>> val inputData = sc.textFile()
>> val keyValue = inputData.map(line => (key, line))
>> val partitions = keValue.partitionBy(new MyPartition(10))
>> partitions.foreachPartition(writeLines)
>>
>>
>> class MyPartitioner(partitions: Int) extends Partitioner {
>>     override def numPartitions: Int = partitions
>>
>>     override def getPartition(key: Any): Int = {
>>         (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions // make 
>> sure lines with the same key in the same partition
>>
>>     }
>> }
>>
>>
>>
>> 2014-08-11 20:42 GMT+08:00 诺铁 <noty...@gmail.com>:
>>
>> hi,
>>>
>>> I have googled and find similar question without good answer,
>>> http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark
>>>
>>> in short, I would like to separate raw data and divide by some key, for
>>> example, create date, and put the in directory named by date, so that I can
>>> easily access portion of data later.
>>>
>>>  for now I have to extract all keys and then filter by key and save to
>>> file repeatly. are there any good way to do this?  or maybe I shouldn't do
>>> such thing?
>>>
>>
>>
>

Reply via email to