Andrew, according to http://stackoverflow.com/a/17241273/1136722 , what you described is the old way of doing this.
On Fri, Jan 3, 2014 at 5:43 PM, Andrew Ash <[email protected]> wrote: > For hadoop properties I find the most reliable way to be to set them on a > Configuration object and use a method on SparkContext that accepts that > conf object. > > From working code: > > import org.apache.hadoop.io.LongWritable > import org.apache.hadoop.io.Text > import org.apache.hadoop.conf.Configuration > import org.apache.hadoop.mapreduce.lib.input.TextInputFormat > > def nlLZOfile(path: String) = { > val conf = new Configuration > conf.set("textinputformat.record.delimiter", "\n") > sc.newAPIHadoopFile(path, > classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable], > classOf[Text], conf) > .map(_._2.toString) > } > > > On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia > <[email protected]>wrote: > >> Thanks for clarifying this. >> >> I tried setting hadoop properties before constructing SparkContext, but >> it had no effect. >> >> Where is the right place to set these properties? >> >> >> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel < >> [email protected]> wrote: >> >>> Hi, >>> >>> I believe Kryo is only use during RDD serialization (i.e. communication >>> between nodes), not for saving. If you want to compress output, you can use >>> GZip or snappy codec like that : >>> >>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy >>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip >>> >>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress", >>> "true") >>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec", >>> codec) >>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type", >>> "BLOCK") >>> >>> (That's for HDP2, for HDP1, the keys are different) >>> Regards >>> Guillaume >>> >>> Hi, >>> >>> I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double >>> Double)*], expecting the output binary to be smaller, but it is exactly >>> the same size of when kryo is not on. >>> >>> I've checked the log, and there is no trace of kryo related errors. >>> >>> The code looks something like: >>> >>> class MyRegistrator extends KryoRegistrator { >>> override def registerClasses(kryo: Kryo) { >>> kryo.setRegistrationRequired(true) >>> kryo.register(classOf[*(Int, Int, Double Double)*]) >>> } >>> } >>> System.setProperty("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> System.setProperty("spark.kryo.registrator", "MyRegistrator") >>> >>> At the end, I tried to call: >>> >>> kryo.setRegistrationRequired(*true*) >>> >>> to make sure my class gets registered. But I found errors like: >>> >>> Exception in thread "DAGScheduler" >>> com.esotericsoftware.kryo.KryoException: >>> java.lang.IllegalArgumentException: Class is not registered: >>> *scala.math.Numeric$IntIsIntegral$* >>> Note: To register this class use: >>> kryo.register(scala.math.Numeric$IntIsIntegral$.class); >>> >>> It appears many scala internal types have to be registered in order to >>> have full kryo support. >>> >>> Any idea why my simple tuple type should not get kryo benefits? >>> >>> >>> >>> -- >>> [image: eXenSa] >>> *Guillaume PITEL, Président* >>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53 >>> >>> eXenSa S.A.S. <http://www.exensa.com/> >>> 41, rue Périer - 92120 Montrouge - FRANCE >>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05 >>> >> >> >
<<exensa_logo_mail.png>>
