For hadoop properties I find the most reliable way to be to set them on a Configuration object and use a method on SparkContext that accepts that conf object.
>From working code:
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
def nlLZOfile(path: String) = {
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "\n")
sc.newAPIHadoopFile(path,
classOf[com.hadoop.mapreduce.LzoTextInputFormat], classOf[LongWritable],
classOf[Text], conf)
.map(_._2.toString)
}
On Fri, Jan 3, 2014 at 12:34 PM, Aureliano Buendia <[email protected]>wrote:
> Thanks for clarifying this.
>
> I tried setting hadoop properties before constructing SparkContext, but it
> had no effect.
>
> Where is the right place to set these properties?
>
>
> On Fri, Jan 3, 2014 at 4:56 PM, Guillaume Pitel <
> [email protected]> wrote:
>
>> Hi,
>>
>> I believe Kryo is only use during RDD serialization (i.e. communication
>> between nodes), not for saving. If you want to compress output, you can use
>> GZip or snappy codec like that :
>>
>> val codec = "org.apache.hadoop.io.compress.SnappyCodec" // for snappy
>> val codec = "org.apache.hadoop.io.compress.GzipCodec" // for gzip
>>
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress",
>> "true")
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.codec",
>> codec)
>> System.setProperty("spark.hadoop.mapreduce.output.fileoutputformat.compress.type",
>> "BLOCK")
>>
>> (That's for HDP2, for HDP1, the keys are different)
>> Regards
>> Guillaume
>>
>> Hi,
>>
>> I'm trying to call saveAsObjectFile() on an RDD[*(Int, Int, Double
>> Double)*], expecting the output binary to be smaller, but it is exactly
>> the same size of when kryo is not on.
>>
>> I've checked the log, and there is no trace of kryo related errors.
>>
>> The code looks something like:
>>
>> class MyRegistrator extends KryoRegistrator {
>> override def registerClasses(kryo: Kryo) {
>> kryo.setRegistrationRequired(true)
>> kryo.register(classOf[*(Int, Int, Double Double)*])
>> }
>> }
>> System.setProperty("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> System.setProperty("spark.kryo.registrator", "MyRegistrator")
>>
>> At the end, I tried to call:
>>
>> kryo.setRegistrationRequired(*true*)
>>
>> to make sure my class gets registered. But I found errors like:
>>
>> Exception in thread "DAGScheduler"
>> com.esotericsoftware.kryo.KryoException:
>> java.lang.IllegalArgumentException: Class is not registered:
>> *scala.math.Numeric$IntIsIntegral$*
>> Note: To register this class use:
>> kryo.register(scala.math.Numeric$IntIsIntegral$.class);
>>
>> It appears many scala internal types have to be registered in order to
>> have full kryo support.
>>
>> Any idea why my simple tuple type should not get kryo benefits?
>>
>>
>>
>> --
>> [image: eXenSa]
>> *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>> eXenSa S.A.S. <http://www.exensa.com/>
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<<exensa_logo_mail.png>>
