Hi,

if you now want to write 1 file per partition, that's actually built into
Spark as

*saveAsTextFile*(*path*)Write the elements of the dataset as a text file
(or set of text files) in a given directory in the local filesystem, HDFS
or any other Hadoop-supported file system. Spark will call toString on each
element to convert it to a line of text in the file.

On Wed, Jun 10, 2015 at 4:44 AM, Pa Rö <paul.roewer1...@googlemail.com>
wrote:

> hi,
>
> i have an idea to solve my problem, i want write one file for each spark
> partion,
> but i not know to get the actuel partion suffix/ID in my call function?
>
> points.foreachPartition(
>                 new VoidFunction<Iterator<Tuple2<Integer,
> GeoTimeDataTupel>>>() {
>
>                     private static final long serialVersionUID =
> -7210897529331503565L;
>
>                     public void call(Iterator<Tuple2<Integer,
> GeoTimeDataTupel>> entry)throws Exception {
>                         while(entry.hasNext()) {
>                             Tuple2<Integer, GeoTimeDataTupel> temp =
> entry.next();
>
>                             try {
>                                 FileSystem fs = FileSystem.get(new
> URI(pro.getProperty("hdfs.namenode")),new Configuration());
>                                 Path pt=new
> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
>                             }
>                             catch(Exception e) {
>                                 e.printStackTrace();
>                             }
>                         }
>                     }
>                 }
>         );
>
> 2015-06-09 15:34 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>:
>
>> hi community,
>>
>> i want append results to one file. if i work local my function build all
>> right,
>> if i run this on a yarn cluster, i lost same rows.
>>
>> here my function to write:
>>
>> points.foreach(
>>             new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() {
>>
>>                 private static final long serialVersionUID =
>> 2459995649387229261L;
>>
>>                 public void call(Tuple2<Integer, GeoTimeDataTupel>
>> entry)throws Exception {
>>                     try {
>>                         FileSystem fs = FileSystem.get(new
>> URI(pro.getProperty("hdfs.namenode")),new Configuration());
>>                         Path pt=new
>> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results");
>>
>>                         if(fs.exists(pt)) {
>>                             FSDataInputStream in = fs.open(pt);
>>                             Path pt_temp = new
>> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp");
>>                             backup(fs.getConf(), fs, in, pt_temp);
>>                             in.close();
>>
>>                             FSDataOutputStream out = fs.create((pt),
>> true);
>>                             FSDataInputStream backup = fs.open(pt_temp);
>>
>>                             int offset = 0;
>>                             int bufferSize = 4096;
>>
>>                             int result = 0;
>>
>>                             byte[] buffer = new byte[bufferSize];
>>                             // pre read a part of content from input
>> stream
>>                             result = backup.read(offset, buffer, 0,
>> bufferSize);
>>                             // loop read input stream until it does not
>> fill whole size of buffer
>>                             while (result == bufferSize) {
>>                                 out.write(buffer);
>>                                 // read next segment from input stream by
>> moving the offset pointer
>>                                 offset += bufferSize;
>>                                 result = backup.read(offset, buffer, 0,
>> bufferSize);
>>                             }
>>
>>                             if (result > 0 && result < bufferSize) {
>>                                 for (int i = 0; i < result; i++) {
>>                                     out.write(buffer[i]);
>>                                 }
>>                             }
>>                             out.writeBytes("Cluster: "+entry._1+", Point:
>> "+entry._2.toString()+"\n");
>>                             out.close();
>>                         }
>>                         else {
>>                             BufferedWriter bw =new BufferedWriter(new
>> OutputStreamWriter(fs.create(pt)));
>>                             bw.write("Cluster: "+entry._1+", Point:
>> "+entry._2.toString()+"\n");
>>                             bw.close();
>>                         }
>>                     } catch (Exception e) {
>>                         e.printStackTrace();
>>                     }
>>                 }
>>
>>                 public void backup(Configuration conf, FileSystem
>> fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception {
>>
>>                     FSDataOutputStream out = fs.create(pt_temp, true);
>>                     IOUtils.copyBytes(sourceContent, out, 4096, false);
>>                     out.close();
>>                 }
>>
>> where is my fault?? or give it a function to write(append) to the hadoop
>> hdfs?
>>
>> best regards,
>> paul
>>
>
>

Reply via email to