Andrew, according to http://stackoverflow.com/a/17241273/1136722 , what you
described is the old way of doing this.


On Fri, Jan 3, 2014 at 5:43 PM, Andrew Ash <[email protected]> wrote:

> For hadoop properties I find the most reliable way to be to set them on a
> Configuration object and use a method on SparkContext that accepts that
> conf object.
>
> From working code:
>
> import org.apache.hadoop.io.LongWritable
> import org.apache.hadoop.io.Text
> import org.apache.hadoop.conf.Configuration
> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
>
> def nlLZOfile(path: String) = {
>     val conf = new Configuration
>     conf.set("textinputformat.record.delimiter", "\n")
>     sc.newAPIHadoopFile(path,
> classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable],
> classOf[Text], conf)
>       .map(_._2.toString)
> }
>
>
> On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia 
> <[email protected]>wrote:
>
>> Thanks for clarifying this.
>>
>> I tried setting hadoop properties before constructing SparkContext, but
>> it had no effect.
>>
>> Where is the right place to set these properties?
>>
>>
>> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
>> [email protected]> wrote:
>>
>>>  Hi,
>>>
>>> I believe Kryo is only use during RDD serialization (i.e. communication
>>> between nodes), not for saving. If you want to compress output, you can use
>>> GZip or snappy codec like that :
>>>
>>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>>
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>>> "true")
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>>> codec)
>>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>>> "BLOCK")
>>>
>>> (That's for HDP2, for HDP1, the keys are different)
>>> Regards
>>> Guillaume
>>>
>>>   Hi,
>>>
>>>  I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>>> Double)*], expecting the output binary to be smaller, but it is exactly
>>> the same size of when kryo is not on.
>>>
>>>  I've checked the log, and there is no trace of kryo related errors.
>>>
>>>  The code looks something like:
>>>
>>> class MyRegistrator extends KryoRegistrator {
>>>   override def registerClasses(kryo: Kryo) {
>>>     kryo.setRegistrationRequired(true)
>>>     kryo.register(classOf[*(Int, Int, Double Double)*])
>>>   }
>>> }
>>>  System.setProperty("spark.serializer",
>>> "org.apache.spark.serializer.KryoSerializer")
>>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>>
>>>  At the end, I tried to call:
>>>
>>> kryo.setRegistrationRequired(*true*)
>>>
>>>  to make sure my class gets registered. But I found errors like:
>>>
>>> Exception in thread "DAGScheduler"
>>> com.esotericsoftware.kryo.KryoException:
>>> java.lang.IllegalArgumentException: Class is not registered:
>>> *scala.math.Numeric$IntIsIntegral$*
>>> Note: To register this class use:
>>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>>
>>>  It appears many scala internal types have to be registered in order to
>>> have full kryo support.
>>>
>>>  Any idea why my simple tuple type should not get kryo benefits?
>>>
>>>
>>>
>>> --
>>>    [image: eXenSa]
>>>  *Guillaume PITEL, Président*
>>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>>
>>>  eXenSa S.A.S. <http://www.exensa.com/>
>>>  41, rue Périer - 92120 Montrouge - FRANCE
>>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>>
>>
>>
>

<<exensa_logo_mail.png>>

Reply via email to