it seems saveAsObjectFile(), saveAsSequenceFile() and saveAsHadoopFile() are written in a rather dirty and inconsistent way.
saveAsObjectFile calls saveAsSequenceFile, but does not pass the codec:
def saveAsObjectFile(path: String) {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
.*saveAsSequenceFile*(path)
}
so the codec is set to None:
def saveAsSequenceFile(path: String, *codec*: Option[Class[_ <:
CompressionCodec]] = None) {
...
self.*saveAsHadoopFile*(path, keyClass, valueClass, format, jobConf,
*codec*)
...
}
saveAsHadoopFile only applies compression when the codec is available, and
it does not seem to respect the global hadoop compression properties:
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf = new JobConf(self.context.hadoopConfiguration),
codec: Option[Class[_ <: CompressionCodec]] = None) {
conf.setOutputKeyClass(keyClass)
conf.setOutputValueClass(valueClass)
// conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9
due to what may be a generics bug
conf.set("mapred.output.format.class", outputFormatClass.getName)
*for (c <- codec) { conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type",
CompressionType.BLOCK.toString) }*
conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(conf,
SparkHadoopWriter.createPathFromString(path, conf))
saveAsHadoopDataset(conf)
}
On Fri, Jan 3, 2014 at 9:49 PM, Aureliano Buendia <[email protected]>wrote:
>
>
>
> On Fri, Jan 3, 2014 at 8:25 PM, Guillaume Pitel <
> [email protected]> wrote:
>
>> Have you tried with the mapred.* properties ? If saveAsObjectFile uses
>> saveAsSequenceFile, maybe it uses the old API ?
>>
>
> None of spark.hadoop.mapred.* and spark.hadoop.mapreduce.* approaches
> cause compression with saveAsObject. (Using spark 0.8.1)
>
>
>>
>> Guillaume
>>
>> But why is that hadoop compression doesn't work for saveAsObject(),
>> but it does work (according to Guillaume) for saveAsHadoopFile()?
>>
>>
>> --
>> [image: eXenSa]
>> *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>> eXenSa S.A.S. <http://www.exensa.com/>
>> 41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>
<<exensa_logo_mail.png>>
