Hi, if you now want to write 1 file per partition, that's actually built into Spark as
*saveAsTextFile*(*path*)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. On Wed, Jun 10, 2015 at 4:44 AM, Pa Rö <paul.roewer1...@googlemail.com> wrote: > hi, > > i have an idea to solve my problem, i want write one file for each spark > partion, > but i not know to get the actuel partion suffix/ID in my call function? > > points.foreachPartition( > new VoidFunction<Iterator<Tuple2<Integer, > GeoTimeDataTupel>>>() { > > private static final long serialVersionUID = > -7210897529331503565L; > > public void call(Iterator<Tuple2<Integer, > GeoTimeDataTupel>> entry)throws Exception { > while(entry.hasNext()) { > Tuple2<Integer, GeoTimeDataTupel> temp = > entry.next(); > > try { > FileSystem fs = FileSystem.get(new > URI(pro.getProperty("hdfs.namenode")),new Configuration()); > Path pt=new > Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results"); > } > catch(Exception e) { > e.printStackTrace(); > } > } > } > } > ); > > 2015-06-09 15:34 GMT+02:00 Pa Rö <paul.roewer1...@googlemail.com>: > >> hi community, >> >> i want append results to one file. if i work local my function build all >> right, >> if i run this on a yarn cluster, i lost same rows. >> >> here my function to write: >> >> points.foreach( >> new VoidFunction<Tuple2<Integer, GeoTimeDataTupel>>() { >> >> private static final long serialVersionUID = >> 2459995649387229261L; >> >> public void call(Tuple2<Integer, GeoTimeDataTupel> >> entry)throws Exception { >> try { >> FileSystem fs = FileSystem.get(new >> URI(pro.getProperty("hdfs.namenode")),new Configuration()); >> Path pt=new >> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results"); >> >> if(fs.exists(pt)) { >> FSDataInputStream in = fs.open(pt); >> Path pt_temp = new >> Path(fs.getHomeDirectory()+pro.getProperty("spark.output")+"/results_temp"); >> backup(fs.getConf(), fs, in, pt_temp); >> in.close(); >> >> FSDataOutputStream out = fs.create((pt), >> true); >> FSDataInputStream backup = fs.open(pt_temp); >> >> int offset = 0; >> int bufferSize = 4096; >> >> int result = 0; >> >> byte[] buffer = new byte[bufferSize]; >> // pre read a part of content from input >> stream >> result = backup.read(offset, buffer, 0, >> bufferSize); >> // loop read input stream until it does not >> fill whole size of buffer >> while (result == bufferSize) { >> out.write(buffer); >> // read next segment from input stream by >> moving the offset pointer >> offset += bufferSize; >> result = backup.read(offset, buffer, 0, >> bufferSize); >> } >> >> if (result > 0 && result < bufferSize) { >> for (int i = 0; i < result; i++) { >> out.write(buffer[i]); >> } >> } >> out.writeBytes("Cluster: "+entry._1+", Point: >> "+entry._2.toString()+"\n"); >> out.close(); >> } >> else { >> BufferedWriter bw =new BufferedWriter(new >> OutputStreamWriter(fs.create(pt))); >> bw.write("Cluster: "+entry._1+", Point: >> "+entry._2.toString()+"\n"); >> bw.close(); >> } >> } catch (Exception e) { >> e.printStackTrace(); >> } >> } >> >> public void backup(Configuration conf, FileSystem >> fs,FSDataInputStream sourceContent, Path pt_temp) throws Exception { >> >> FSDataOutputStream out = fs.create(pt_temp, true); >> IOUtils.copyBytes(sourceContent, out, 4096, false); >> out.close(); >> } >> >> where is my fault?? or give it a function to write(append) to the hadoop >> hdfs? >> >> best regards, >> paul >> > >