it seems saveAsObjectFile(), saveAsSequenceFile() and saveAsHadoopFile()
are written in a rather dirty and inconsistent way.

saveAsObjectFile calls saveAsSequenceFile, but does not pass the codec:

def saveAsObjectFile(path: String) {
    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
      .*saveAsSequenceFile*(path)
  }

so the codec is set to None:

def saveAsSequenceFile(path: String, *codec*: Option[Class[_ <:
CompressionCodec]] = None) {
    ...
      self.*saveAsHadoopFile*(path, keyClass, valueClass, format, jobConf,
*codec*)
    ...
  }

saveAsHadoopFile only applies compression when the codec is available, and
it does not seem to respect the global hadoop compression properties:

def saveAsHadoopFile(
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: OutputFormat[_, _]],
      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
      codec: Option[Class[_ <: CompressionCodec]] = None) {
    conf.setOutputKeyClass(keyClass)
    conf.setOutputValueClass(valueClass)
    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9
due to what may be a generics bug
    conf.set("mapred.output.format.class", outputFormatClass.getName)






*for (c <- codec) {      conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type",
CompressionType.BLOCK.toString)    }*
    conf.setOutputCommitter(classOf[FileOutputCommitter])
    FileOutputFormat.setOutputPath(conf,
SparkHadoopWriter.createPathFromString(path, conf))
    saveAsHadoopDataset(conf)
  }



On Fri, Jan 3, 2014 at 9:49 PM, Aureliano Buendia <[email protected]>wrote:

>
>
>
> On Fri, Jan 3, 2014 at 8:25 PM, Guillaume Pitel <
> [email protected]> wrote:
>
>>  Have you tried with the mapred.* properties ? If saveAsObjectFile uses
>> saveAsSequenceFile, maybe it uses the old API ?
>>
>
> None of spark.hadoop.mapred.* and spark.hadoop.mapreduce.* approaches
> cause compression with saveAsObject. (Using spark 0.8.1)
>
>
>>
>> Guillaume
>>
>>   But why is that hadoop compression doesn't work for saveAsObject(),
>> but it does work (according to Guillaume) for saveAsHadoopFile()?
>>
>>
>> --
>>    [image: eXenSa]
>>  *Guillaume PITEL, Président*
>> +33(0)6 25 48 86 80 / +33(0)9 70 44 67 53
>>
>>  eXenSa S.A.S. <http://www.exensa.com/>
>>  41, rue Périer - 92120 Montrouge - FRANCE
>> Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
>>
>
>

<<exensa_logo_mail.png>>

Reply via email to