Re: Sorted Multiple Outputs
Hi Eugene, in my case the list of values that I want to sort and write to a separate file, its fairly small so the way I solved it is the following: .groupByKey().foreach(e = { val hadoopConfig = new Configuration() val hdfs = FileSystem.get(hadoopConfig); val newPath = rootPath+/+e._1; val dstream = hdfs.create(new Path(newPath)); val bstream = new BufferedOutputStream(dstream, 100 * 1024) val writer = new PrintWriter(bstream) e._2.toList.sortBy(_._1).foreach(sub = { writer.println(Utils.getDateStr(sub._1)+,+sub._2+,+sub._3); }) writer.flush() writer.close(); }) Not sure what I changed to the way I write to HDFS, but this approach worked. Thanks a lot! On 13 August 2015 at 01:06, Eugene Morozov fathers...@list.ru wrote: Yiannis, sorry for late response, It is indeed not possible to create new RDD inside of foreachPartitions, so you have to write data manually. I haven’t tried that and haven’t got such an exception, but I’d assume you might try to write locally and them upload it into HDFS. FileSystem has a specific method for that “copyFromLocalFile”. Another approach would be to try to split RDD into multiple RDDs by key. You can get distinct keys, collect them on driver and have a loop over they keys and filter out new RDD out of the original one by that key. for( key : keys ) { RDD.filter( key ).saveAsTextfile() } It might help to cache original rdd. On 16 Jul 2015, at 12:21, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Eugene, thanks for your response! Your recommendation makes sense, that's what I more or less tried. The problem that I am facing is that inside foreachPartition() I cannot create a new rdd and use saveAsTextFile. It would probably make sense to write directly to HDFS using the Java API. When I tried that I was getting errors similar to this: Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel Probably it's hitting a race condition. Has anyone else faced this situation? Any suggestions? Thanks a lot! On 15 July 2015 at 14:04, Eugene Morozov fathers...@list.ru wrote: Yiannis , It looks like you might explore other approach. sc.textFile(input/path) .map() // your own implementation .partitionBy(new HashPartitioner(num)) .groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values .foreachPartition() On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuplekey, Iterablevalue and store one key to a specific file. On 15 Jul 2015, at 03:23, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I have been using the approach described here: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file. Thanks a lot! Eugene Morozov fathers...@list.ru Eugene Morozov fathers...@list.ru
Re: Sorted Multiple Outputs
Yiannis, sorry for late response, It is indeed not possible to create new RDD inside of foreachPartitions, so you have to write data manually. I haven’t tried that and haven’t got such an exception, but I’d assume you might try to write locally and them upload it into HDFS. FileSystem has a specific method for that “copyFromLocalFile”. Another approach would be to try to split RDD into multiple RDDs by key. You can get distinct keys, collect them on driver and have a loop over they keys and filter out new RDD out of the original one by that key. for( key : keys ) { RDD.filter( key ).saveAsTextfile() } It might help to cache original rdd. On 16 Jul 2015, at 12:21, Yiannis Gkoufas johngou...@gmail.com wrote: Hi Eugene, thanks for your response! Your recommendation makes sense, that's what I more or less tried. The problem that I am facing is that inside foreachPartition() I cannot create a new rdd and use saveAsTextFile. It would probably make sense to write directly to HDFS using the Java API. When I tried that I was getting errors similar to this: Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel Probably it's hitting a race condition. Has anyone else faced this situation? Any suggestions? Thanks a lot! On 15 July 2015 at 14:04, Eugene Morozov fathers...@list.ru wrote: Yiannis , It looks like you might explore other approach. sc.textFile(input/path) .map() // your own implementation .partitionBy(new HashPartitioner(num)) .groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values .foreachPartition() On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuplekey, Iterablevalue and store one key to a specific file. On 15 Jul 2015, at 03:23, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I have been using the approach described here: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file. Thanks a lot! Eugene Morozov fathers...@list.ru Eugene Morozov fathers...@list.ru
Re: Sorted Multiple Outputs
Hi Eugene, thanks for your response! Your recommendation makes sense, that's what I more or less tried. The problem that I am facing is that inside foreachPartition() I cannot create a new rdd and use saveAsTextFile. It would probably make sense to write directly to HDFS using the Java API. When I tried that I was getting errors similar to this: Failed on local exception: java.io.InterruptedIOException: Interruped while waiting for IO on channel java.nio.channels.SocketChannel Probably it's hitting a race condition. Has anyone else faced this situation? Any suggestions? Thanks a lot! On 15 July 2015 at 14:04, Eugene Morozov fathers...@list.ru wrote: Yiannis , It looks like you might explore other approach. sc.textFile(input/path) .map() // your own implementation .partitionBy(new HashPartitioner(num)) .groupBy() //your own implementation, as a result - PairRDD of key vs Iterable of values .foreachPartition() On the last step you could sort all values for the key and store them into separate file even into the same directory of all other files for other keys. HashParititoner must guarantee that all values for specific key will reside in just one partition, but it might happen that one partition might contain more, than one key (with values). This I’m not sure, but that shouldn’t be a big deal as you would iterate over tuplekey, Iterablevalue and store one key to a specific file. On 15 Jul 2015, at 03:23, Yiannis Gkoufas johngou...@gmail.com wrote: Hi there, I have been using the approach described here: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file. Thanks a lot! Eugene Morozov fathers...@list.ru
Sorted Multiple Outputs
Hi there, I have been using the approach described here: http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job In addition to that, I was wondering if there is a way to set the customize the order of those values contained in each file. Thanks a lot!