Re: hadoop input/output format advanced control
I am trying the very same thing to configure min split size with Spark 1.3.1 and i get compilation error Code: val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 67108864) sc.newAPIHadoopFile(path + /*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) /code Error: [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: error: inferred type arguments [org.apache.hadoop.io.NullWritable,org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord],org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F : org.apache.hadoop.mapreduce.InputFormat[K,V]] [INFO] sc.newAPIHadoopFile(path + /*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) [INFO]^ Hence i modified to code val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize, 67108864) sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + /*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) /code But i still get error [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: error: overloaded method value newAPIHadoopFile with alternatives: [INFO] (path: String,fClass: Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: Class[org.apache.hadoop.io.NullWritable],conf: org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] and [INFO] (path: String)(implicit km: scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], implicit fm: scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] [INFO] cannot be applied to (String, Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], Class[org.apache.hadoop.io.NullWritable], Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], org.apache.hadoop.conf.Configuration) [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + /*.avro, classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) [INFO]^ [ERROR] one error found [INFO] [INFO] BUILD FAILURE [INFO] --- On Tue, Mar 24, 2015 at 11:46 AM, Nick Pentreath nick.pentre...@gmail.com wrote: You can indeed override the Hadoop configuration at a per-RDD level - though it is a little more verbose, as in the below example, and you need to effectively make a copy of the hadoop Configuration: val thisRDDConf = new Configuration(sc.hadoopConfiguration) thisRDDConf.set(mapred.min.split.size, 5) val rdd = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text], thisRDDConf ) println(rdd.partitions.size) val rdd2 = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text] ) println(rdd2.partitions.size) For example, if I run the above on the following directory (some files I have lying around): -rw-r--r-- 1 Nick staff 0B Jul 11 2014 _SUCCESS -rw-r--r-- 1 Nick staff 291M Sep 16 2014 part-0 -rw-r--r-- 1 Nick staff 227M Sep 16 2014 part-1 -rw-r--r-- 1 Nick staff 370M Sep 16 2014 part-2 -rw-r--r-- 1 Nick staff 244M Sep 16 2014 part-3 -rw-r--r-- 1 Nick staff 240M Sep 16 2014 part-4 I get output: 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5 *5* ... and then for the second RDD: 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from newAPIHadoopFile at TestHash.scala:41 *45* As expected. Though a more succinct way of passing in those conf options would be nice - but this should get you what you need. On Mon, Mar 23,
Re: hadoop input/output format advanced control
You can indeed override the Hadoop configuration at a per-RDD level - though it is a little more verbose, as in the below example, and you need to effectively make a copy of the hadoop Configuration: val thisRDDConf = new Configuration(sc.hadoopConfiguration) thisRDDConf.set(mapred.min.split.size, 5) val rdd = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text], thisRDDConf ) println(rdd.partitions.size) val rdd2 = sc.newAPIHadoopFile(path, classOf[SequenceFileInputFormat[IntWritable, Text]], classOf[IntWritable], classOf[Text] ) println(rdd2.partitions.size) For example, if I run the above on the following directory (some files I have lying around): -rw-r--r-- 1 Nick staff 0B Jul 11 2014 _SUCCESS -rw-r--r-- 1 Nick staff 291M Sep 16 2014 part-0 -rw-r--r-- 1 Nick staff 227M Sep 16 2014 part-1 -rw-r--r-- 1 Nick staff 370M Sep 16 2014 part-2 -rw-r--r-- 1 Nick staff 244M Sep 16 2014 part-3 -rw-r--r-- 1 Nick staff 240M Sep 16 2014 part-4 I get output: 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5 *5* ... and then for the second RDD: 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from newAPIHadoopFile at TestHash.scala:41 *45* As expected. Though a more succinct way of passing in those conf options would be nice - but this should get you what you need. On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers ko...@tresata.com wrote: currently its pretty hard to control the Hadoop Input/Output formats used in Spark. The conventions seems to be to add extra parameters to all methods and then somewhere deep inside the code (for example in PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into settings on the Hadoop Configuration object. for example for compression i see codec: Option[Class[_ : CompressionCodec]] = None added to a bunch of methods. how scalable is this solution really? for example i need to read from a hadoop dataset and i dont want the input (part) files to get split up. the way to do this is to set mapred.min.split.size. now i dont want to set this at the level of the SparkContext (which can be done), since i dont want it to apply to input formats in general. i want it to apply to just this one specific input dataset i need to read. which leaves me with no options currently. i could go add yet another input parameter to all the methods (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile, etc.). but that seems ineffective. why can we not expose a Map[String, String] or some other generic way to manipulate settings for hadoop input/output formats? it would require adding one more parameter to all methods to deal with hadoop input/output formats, but after that its done. one parameter to rule them all then i could do: val x = sc.textFile(/some/path, formatSettings = Map(mapred.min.split.size - 12345)) or rdd.saveAsTextFile(/some/path, formatSettings = Map(mapred.output.compress - true, mapred.output.compression.codec - somecodec))
hadoop input/output format advanced control
currently its pretty hard to control the Hadoop Input/Output formats used in Spark. The conventions seems to be to add extra parameters to all methods and then somewhere deep inside the code (for example in PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into settings on the Hadoop Configuration object. for example for compression i see codec: Option[Class[_ : CompressionCodec]] = None added to a bunch of methods. how scalable is this solution really? for example i need to read from a hadoop dataset and i dont want the input (part) files to get split up. the way to do this is to set mapred.min.split.size. now i dont want to set this at the level of the SparkContext (which can be done), since i dont want it to apply to input formats in general. i want it to apply to just this one specific input dataset i need to read. which leaves me with no options currently. i could go add yet another input parameter to all the methods (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile, etc.). but that seems ineffective. why can we not expose a Map[String, String] or some other generic way to manipulate settings for hadoop input/output formats? it would require adding one more parameter to all methods to deal with hadoop input/output formats, but after that its done. one parameter to rule them all then i could do: val x = sc.textFile(/some/path, formatSettings = Map(mapred.min.split.size - 12345)) or rdd.saveAsTextFile(/some/path, formatSettings = Map(mapred.output.compress - true, mapred.output.compression.codec - somecodec))