Re: hadoop input/output format advanced control

2015-06-26 Thread ๏̯͡๏
I am trying the very same thing to configure min split size with Spark
1.3.1 and i get compilation error

Code:

val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)

sc.newAPIHadoopFile(path + /*.avro,

  classOf[AvroKey[GenericRecord]],

  classOf[NullWritable],

  classOf[AvroKeyInputFormat[GenericRecord]],

  hadoopConfiguration)

/code

Error:

[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: inferred type arguments
[org.apache.hadoop.io.NullWritable,org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord],org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]]
do not conform to method newAPIHadoopFile's type parameter bounds [K,V,F :
org.apache.hadoop.mapreduce.InputFormat[K,V]]

[INFO] sc.newAPIHadoopFile(path + /*.avro,
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

[INFO]^


Hence i modified to

code

val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)

hadoopConfiguration.set(mapreduce.input.fileinputformat.split.maxsize,
67108864)

sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + /*.avro,
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

/code


But i still get error

[ERROR]
/Users/dvasthimal/ebay/projects/ep/ep-spark/src/main/scala/com/ebay/ep/poc/spark/reporting/process/util/DataUtil.scala:37:
error: overloaded method value newAPIHadoopFile with alternatives:

[INFO]   (path: String,fClass:
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],kClass:
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],vClass:
Class[org.apache.hadoop.io.NullWritable],conf:
org.apache.hadoop.conf.Configuration)org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)] and

[INFO]   (path: String)(implicit km:
scala.reflect.ClassTag[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
implicit vm: scala.reflect.ClassTag[org.apache.hadoop.io.NullWritable],
implicit fm:
scala.reflect.ClassTag[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]])org.apache.spark.rdd.RDD[(org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord],
org.apache.hadoop.io.NullWritable)]

[INFO]  cannot be applied to (String,
Class[org.apache.avro.mapred.AvroKey[org.apache.avro.generic.GenericRecord]],
Class[org.apache.hadoop.io.NullWritable],
Class[org.apache.avro.mapreduce.AvroKeyInputFormat[org.apache.avro.generic.GenericRecord]],
org.apache.hadoop.conf.Configuration)

[INFO] sc.newAPIHadoopFile[AvroKey[GenericRecord], NullWritable,
AvroKeyInputFormat[GenericRecord]](path + /*.avro,
classOf[AvroKey[GenericRecord]], classOf[NullWritable],
classOf[AvroKeyInputFormat[GenericRecord]], hadoopConfiguration)

[INFO]^

[ERROR] one error found

[INFO]


[INFO] BUILD FAILURE

[INFO] ---

On Tue, Mar 24, 2015 at 11:46 AM, Nick Pentreath nick.pentre...@gmail.com
wrote:

 You can indeed override the Hadoop configuration at a per-RDD level -
 though it is a little more verbose, as in the below example, and you need
 to effectively make a copy of the hadoop Configuration:

 val thisRDDConf = new Configuration(sc.hadoopConfiguration)
 thisRDDConf.set(mapred.min.split.size, 5)
 val rdd = sc.newAPIHadoopFile(path,
   classOf[SequenceFileInputFormat[IntWritable, Text]],
   classOf[IntWritable],
   classOf[Text],
   thisRDDConf
 )
 println(rdd.partitions.size)

 val rdd2 = sc.newAPIHadoopFile(path,
   classOf[SequenceFileInputFormat[IntWritable, Text]],
   classOf[IntWritable],
   classOf[Text]
 )
 println(rdd2.partitions.size)


 For example, if I run the above on the following directory (some files I
 have lying around):

 -rw-r--r--  1 Nick  staff 0B Jul 11  2014 _SUCCESS
 -rw-r--r--  1 Nick  staff   291M Sep 16  2014 part-0
 -rw-r--r--  1 Nick  staff   227M Sep 16  2014 part-1
 -rw-r--r--  1 Nick  staff   370M Sep 16  2014 part-2
 -rw-r--r--  1 Nick  staff   244M Sep 16  2014 part-3
 -rw-r--r--  1 Nick  staff   240M Sep 16  2014 part-4

 I get output:

 15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5
 *5*

 ... and then for the second RDD:

 15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from
 newAPIHadoopFile at TestHash.scala:41
 *45*

 As expected.

 Though a more succinct way of passing in those conf options would be nice
 - but this should get you what you need.



 On Mon, Mar 23, 

Re: hadoop input/output format advanced control

2015-03-24 Thread Nick Pentreath
You can indeed override the Hadoop configuration at a per-RDD level -
though it is a little more verbose, as in the below example, and you need
to effectively make a copy of the hadoop Configuration:

val thisRDDConf = new Configuration(sc.hadoopConfiguration)
thisRDDConf.set(mapred.min.split.size, 5)
val rdd = sc.newAPIHadoopFile(path,
  classOf[SequenceFileInputFormat[IntWritable, Text]],
  classOf[IntWritable],
  classOf[Text],
  thisRDDConf
)
println(rdd.partitions.size)

val rdd2 = sc.newAPIHadoopFile(path,
  classOf[SequenceFileInputFormat[IntWritable, Text]],
  classOf[IntWritable],
  classOf[Text]
)
println(rdd2.partitions.size)


For example, if I run the above on the following directory (some files I
have lying around):

-rw-r--r--  1 Nick  staff 0B Jul 11  2014 _SUCCESS
-rw-r--r--  1 Nick  staff   291M Sep 16  2014 part-0
-rw-r--r--  1 Nick  staff   227M Sep 16  2014 part-1
-rw-r--r--  1 Nick  staff   370M Sep 16  2014 part-2
-rw-r--r--  1 Nick  staff   244M Sep 16  2014 part-3
-rw-r--r--  1 Nick  staff   240M Sep 16  2014 part-4

I get output:

15/03/24 20:43:12 INFO FileInputFormat: Total input paths to process : 5
*5*

... and then for the second RDD:

15/03/24 20:43:12 INFO SparkContext: Created broadcast 1 from
newAPIHadoopFile at TestHash.scala:41
*45*

As expected.

Though a more succinct way of passing in those conf options would be nice -
but this should get you what you need.



On Mon, Mar 23, 2015 at 10:36 PM, Koert Kuipers ko...@tresata.com wrote:

 currently its pretty hard to control the Hadoop Input/Output formats used
 in Spark. The conventions seems to be to add extra parameters to all
 methods and then somewhere deep inside the code (for example in
 PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
 settings on the Hadoop Configuration object.

 for example for compression i see codec: Option[Class[_ :
 CompressionCodec]] = None added to a bunch of methods.

 how scalable is this solution really?

 for example i need to read from a hadoop dataset and i dont want the input
 (part) files to get split up. the way to do this is to set
 mapred.min.split.size. now i dont want to set this at the level of the
 SparkContext (which can be done), since i dont want it to apply to input
 formats in general. i want it to apply to just this one specific input
 dataset i need to read. which leaves me with no options currently. i could
 go add yet another input parameter to all the methods
 (SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
 etc.). but that seems ineffective.

 why can we not expose a Map[String, String] or some other generic way to
 manipulate settings for hadoop input/output formats? it would require
 adding one more parameter to all methods to deal with hadoop input/output
 formats, but after that its done. one parameter to rule them all

 then i could do:
 val x = sc.textFile(/some/path, formatSettings =
 Map(mapred.min.split.size - 12345))

 or
 rdd.saveAsTextFile(/some/path, formatSettings =
 Map(mapred.output.compress - true, mapred.output.compression.codec -
 somecodec))



hadoop input/output format advanced control

2015-03-23 Thread Koert Kuipers
currently its pretty hard to control the Hadoop Input/Output formats used
in Spark. The conventions seems to be to add extra parameters to all
methods and then somewhere deep inside the code (for example in
PairRDDFunctions.saveAsHadoopFile) all these parameters get translated into
settings on the Hadoop Configuration object.

for example for compression i see codec: Option[Class[_ :
CompressionCodec]] = None added to a bunch of methods.

how scalable is this solution really?

for example i need to read from a hadoop dataset and i dont want the input
(part) files to get split up. the way to do this is to set
mapred.min.split.size. now i dont want to set this at the level of the
SparkContext (which can be done), since i dont want it to apply to input
formats in general. i want it to apply to just this one specific input
dataset i need to read. which leaves me with no options currently. i could
go add yet another input parameter to all the methods
(SparkContext.textFile, SparkContext.hadoopFile, SparkContext.objectFile,
etc.). but that seems ineffective.

why can we not expose a Map[String, String] or some other generic way to
manipulate settings for hadoop input/output formats? it would require
adding one more parameter to all methods to deal with hadoop input/output
formats, but after that its done. one parameter to rule them all

then i could do:
val x = sc.textFile(/some/path, formatSettings =
Map(mapred.min.split.size - 12345))

or
rdd.saveAsTextFile(/some/path, formatSettings =
Map(mapred.output.compress - true, mapred.output.compression.codec -
somecodec))