I am trying the very same thing to configure min split size with Spark
1.3.1 and i get compilation error
Code:
val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)
sc.newAPIHadoopFile(path + /*.avro,
classOf[AvroKey[GenericRecord]],
classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]],
hadoopConfiguration)
/code
Error:
[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: inferred type arguments
[org.apache.hadoop.io.NullWritable,org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord],org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]]
do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F :
org.apache.hadoop.mapreduce.InputFormat[K,V]]
[INFO] sc.newAPIHadoopFile(path + /*.avro,
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
[INFO]^
Hence i modified to
code
val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)
sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + /*.avro,
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
/code
But i still get error
[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: overloaded method value newAPIHadoopFile with alternatives:
[INFO] (path: String,fClass:
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
Class[org.apache.hadoop.io.NullWritable],conf:
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)] and
[INFO] (path: String)(implicit km:
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
implicit fm:
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)]
[INFO] cannot be applied to (String,
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
Class[org.apache.hadoop.io.NullWritable],
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
org.apache.hadoop.conf.Configuration)
[INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + /*.avro,
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)
[INFO]^
[ERROR] one error found
[INFO]
[INFO] BUILD FAILURE
[INFO] ---
On Tue, Mar 24, 2015 at 11:46 AM, Nick Pentreath nick.pentre...@gmail.com
wrote:
You can indeed override the Hadoop configuration at a per-RDD level -
though it is a little more verbose, as in the below example, and you need
to effectively make a copy of the hadoop Configuration:
val thisRDDConf = new Configuration(sc.hadoopConfiguration)
thisRDDConf.set(mapred.min.split.size, 5)
val rdd = sc.newAPIHadoopFile(path,
classOf[SequenceFileInputFormat[IntWritable, Text]],
classOf[IntWritable],
classOf[Text],
thisRDDConf
)
println(rdd.partitions.size)
val rdd2 = sc.newAPIHadoopFile(path,
classOf[SequenceFileInputFormat[IntWritable, Text]],
classOf[IntWritable],
classOf[Text]
)
println(rdd2.partitions.size)
For example, if I run the above on the following directory (some files I
have lying around):
-rw-r--r-- 1 Nick staff 0B Jul 11 2014 _SUCCESS
-rw-r--r-- 1 Nick staff 291M Sep 16 2014 part-0
-rw-r--r-- 1 Nick staff 227M Sep 16 2014 part-1
-rw-r--r-- 1 Nick staff 370M Sep 16 2014 part-2
-rw-r--r-- 1 Nick staff 244M Sep 16 2014 part-3
-rw-r--r-- 1 Nick staff 240M Sep 16 2014 part-4
I get output:
15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5
*5*
... and then for the second RDD:
15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from
newAPIHadoopFile at TestHash.scala:41
*45*
As expected.
Though a more succinct way of passing in those conf options would be nice
- but this should get you what you need.
On Mon, Mar 23,