I am trying the very same thing to configure min split size with Spark 1.3.1 and i get compilation error
Code: val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile(path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) </code> Error: [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: error: inferred type arguments [org.apache.hadoop.io.NullWritable,org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord],org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]] do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F <: org.apache.hadoop.mapreduce.InputFormat[K,V]] [INFO] sc.newAPIHadoopFile(path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) [INFO] ^ Hence i modified to <code> val hadoopConfiguration = new Configuration(sc.hadoopConfiguration) hadoopConfiguration.set("mapreduce.input.fileinputformat.split.maxsize", "67108864") sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) </code> But i still get error [ERROR] /Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37: error: overloaded method value newAPIHadoopFile with alternatives: [INFO] (path: String,fClass: Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass: Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass: Class[org.apache.hadoop.io.NullWritable],conf: org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] <and> [INFO] (path: String)(implicit km: scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable], implicit fm: scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord], org.apache.hadoop.io.NullWritable)] [INFO] cannot be applied to (String, Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]], Class[org.apache.hadoop.io.NullWritable], Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]], org.apache.hadoop.conf.Configuration) [INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable, AvroKeyInputFormat[GenericRecord]](path + "/*.avro", classOf[AvroKey[GenericRecord]], classOf[NullWritable], classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration) [INFO] ^ [ERROR] one error found [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ----------------------- On Tue, Mar 24, 2015 at 11:46 AM, Nick Pentreath <nick.pentre...@gmail.com> wrote: > You can indeed override the Hadoop configuration at a per-RDD level - > though it is a little more verbose, as in the below example, and you need > to effectively make a copy of the hadoop Configuration: > > val thisRDDConf = new Configuration(sc.hadoopConfiguration) > thisRDDConf.set("mapred.min.split.size", "500000000") > val rdd = sc.newAPIHadoopFile(path, > classOf[SequenceFileInputFormat[IntWritable, Text]], > classOf[IntWritable], > classOf[Text], > thisRDDConf > ) > println(rdd.partitions.size) > > val rdd2 = sc.newAPIHadoopFile(path, > classOf[SequenceFileInputFormat[IntWritable, Text]], > classOf[IntWritable], > classOf[Text] > ) > println(rdd2.partitions.size) > > > For example, if I run the above on the following directory (some files I > have lying around): > > -rw-r--r-- 1 Nick staff 0B Jul 11 2014 _SUCCESS > -rw-r--r-- 1 Nick staff 291M Sep 16 2014 part-00000 > -rw-r--r-- 1 Nick staff 227M Sep 16 2014 part-00001 > -rw-r--r-- 1 Nick staff 370M Sep 16 2014 part-00002 > -rw-r--r-- 1 Nick staff 244M Sep 16 2014 part-00003 > -rw-r--r-- 1 Nick staff 240M Sep 16 2014 part-00004 > > I get output: > > 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5 > *5* > > ... and then for the second RDD: > > 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from > newAPIHadoopFile at TestHash.scala:41 > *45* > > As expected. > > Though a more succinct way of passing in those conf options would be nice > - but this should get you what you need. > > > > On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers <ko...@tresata.com> wrote: > >> currently its pretty hard to control the Hadoop Input/Output formats used >> in Spark. The conventions seems to be to add extra parameters to all >> methods and then somewhere deep inside the code (for example in >> PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into >> settings on the Hadoop Configuration object. >> >> for example for compression i see "codec: Option[Class[_ <: >> CompressionCodec]] = None" added to a bunch of methods. >> >> how scalable is this solution really? >> >> for example i need to read from a hadoop dataset and i dont want the >> input (part) files to get split up. the way to do this is to set >> "mapred.min.split.size". now i dont want to set this at the level of the >> SparkContext (which can be done), since i dont want it to apply to input >> formats in general. i want it to apply to just this one specific input >> dataset i need to read. which leaves me with no options currently. i could >> go add yet another input parameter to all the methods >> (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile, >> etc.). but that seems ineffective. >> >> why can we not expose a Map[String, String] or some other generic way to >> manipulate settings for hadoop input/output formats? it would require >> adding one more parameter to all methods to deal with hadoop input/output >> formats, but after that its done. one parameter to rule them all.... >> >> then i could do: >> val x = sc.textFile("/some/path", formatSettings = >> Map("mapred.min.split.size" -> "12345")) >> >> or >> rdd.saveAsTextFile("/some/path, formatSettings = >> Map(mapred.output.compress" -> "true", "mapred.output.compression.codec" -> >> "somecodec")) >> > > -- Deepak