Did you try the sc.binaryFiles() which gives you an RDD of
PortableDataStream that wraps around the underlying bytes.

On Tue, Oct 27, 2015 at 10:23 PM, Balachandar R.A. <balachandar...@gmail.com
> wrote:

> Hello,
>
>
> I have developed a hadoop based solution that process a binary file. This
> uses classic hadoop MR technique. The binary file is about 10GB and divided
> into 73 HDFS blocks, and the business logic written as map process operates
> on each of these 73 blocks. We have developed a customInputFormat and
> CustomRecordReader in Hadoop that returns key (intWritable) and value
> (BytesWritable) to the map function. The value is nothing but the contents
> of a HDFS block(bianry data). The business logic knows how to read this
> data.
>
> Now, I would like to port this code in spark. I am a starter in spark and
> could run simple examples (wordcount, pi example) in spark. However, could
> not straightforward example to process binaryFiles in spark. I see there
> are two solutions for this use case. In the first, avoid using custom input
> format and record reader. Find a method (approach) in spark the creates a
> RDD for those HDFS blocks, use a map like method that feeds HDFS block
> content to the business logic. If this is not possible, I would like to
> re-use the custom input format and custom reader using some methods such as
> HadoopAPI, HadoopRDD etc. My problem:- I do not know whether the first
> approach is possible or not. If possible, can anyone please provide some
> pointers that contains examples? I was trying second approach but highly
> unsuccessful. Here is the code snippet I used
>
> object Driver {
>     def myFunc(key : IntWritable, content : BytesWritable) = {
>        println("my business logic")
>       // printing key and content value/size is 0
>    }
>
>
> def main(args: Array[String]) {
>   // create a spark context
>   val conf = new      
> SparkConf().setAppName("Dummy").setMaster("spark://<host>:7077")
>   val sc = new SparkContext(conf)
>   val rd = sc.newAPIHadoopFile("hdfs:///user/name/MyDataFile.dat", 
> classOf[RandomAccessInputFormat], classOf[IntWritable], 
> classOf[BytesWritable])
>   val count = rd.map (x => func(x._1, x._2)).collect()
>    }
> }
>
> Can someone tell where I am doing wrong here? I think I am not using API
> the right way but failed to find some documentation/usage examples.
>
>
> Thanks in advancea
>
> - bala
>



-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++

Reply via email to