I got this to run, maybe in a tad twisted way. Here is what I did to get to
read Lzo compressed Protobufs in spark (I'm on 0.8.0) :
- I added hadoop's conf folder to spark classpath (in spark-env.sh) in all the
nodes and the shell as well - but that didn't help either. So I just added the
property in configuration manually :
val conf = new Job().getConfiguration
conf.set("io.compression.codecs","com.hadoop.compression.lzo.LzopCodec")
val logRecord = sc.newAPIHadoopFile(
filepath,classOf[...],classOf[...],classOf[...], conf)
This seem to resolve the "No codec found" problem below
- I use twitter's ElephantBird to read lzo compressed protobufs using
MultiInputFormat and read the data out as BinaryWritable. The only additional
thing I had to do was to set the classConf in MutiInputFormat class.
import com.twitter.elephantbird.mapreduce.input.MultiInputFormat
import com.twitter.elephantbird.mapreduce.io.BinaryWritable
MultiInputFormat.setClassConf(classOf[MyProtoClass],conf)
val record = sc.newAPIHadoopFile(
inputpath,classOf[MultiInputFormat[MyProtoClass]],classOf[LongWritable],classOf[BinaryWritable[MyProtoClass]],
conf)
//this gets you the protobuf from BinaryWritable - thereafter you just follow
your class structure
val protobuf = record.map(_._2.get.getProtobuf)
Hope this helps whoever is working with lzo compressed protobufs
~Vipul
On Jan 22, 2014, at 2:09 PM, Vipul Pandey <[email protected]> wrote:
> Issac,
>
> I have all these entries in my core-site.xml and as I mentioned before my Pig
> jobs are running just fine. And the JAVA_LIBRARY_PATH already points to the
> lzo lib directory.
> Not sure what to change/add and where.
>
> Thanks,
> Vipul
>
>
>
> On Jan 22, 2014, at 1:37 PM, Issac Buenrostro <[email protected]> wrote:
>
>> You need a core-site.xml file in the classpath with these lines
>>
>> <?xml version="1.0"?>
>> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
>>
>> <configuration>
>>
>> <property>
>> <name>io.compression.codecs</name>
>>
>> <value>org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.hadoop.io.compress.SnappyCodec</value>
>> </property>
>> <property>
>> <name>io.compression.codec.lzo.class</name>
>> <value>com.hadoop.compression.lzo.LzoCodec</value>
>> </property>
>>
>> </configuration>
>>
>>
>> I also added both the native libraries path and the path to lzoc library to
>> JAVA_LIBRARY_PATH, but I don't know if this is necessary. This is the
>> command I used in mac:
>>
>> export
>> JAVA_LIBRARY_PATH=/Users/*/hadoop-lzo/target/native/Mac_OS_X-x86_64-64/lib:/usr/local/Cellar/lzo/2.06/lib
>>
>>
>> On Wed, Jan 22, 2014 at 12:28 PM, Vipul Pandey <[email protected]> wrote:
>>
>>> Have you tried looking at the HBase and Cassandra examples under the spark
>>> example project? These use custom InputFormats and may provide guidance as
>>> to how to go about using the relevant Protobuf inputformat.
>>
>>
>> Thanks for the pointer Nick, I will look at it once I get past the LZO
>> stage.
>>
>>
>> Issac,
>>
>> How did you get Spark to use the LZO native libraries. I have a fully
>> functional hadoop deployment with pig and scalding crunching the lzo files.
>> But even after adding the lzo library folder to SPARK_CLASSPATH I get the
>> following error :
>>
>> java.io.IOException: No codec for file
>> hdfs://abc.xxx.com:8020/path/to/lzo/file.lzo found, cannot run
>> at
>> com.twitter.elephantbird.mapreduce.input.LzoRecordReader.initialize(LzoRecordReader.java:80)
>> at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:86)
>>
>>
>>
>> Thanks
>> Vipul
>>
>> On Jan 21, 2014, at 9:32 AM, Issac Buenrostro <[email protected]> wrote:
>>
>>> Hi Vipul,
>>>
>>> I use something like this to read from LZO compressed text files, it may be
>>> helpful:
>>>
>>> import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat
>>> import org.apache.hadoop.io.{LongWritable, Text}
>>> import org.apache.hadoop.mapreduce.Job
>>>
>>> val sc = new SparkContext(sparkMaster, "lzoreader", sparkDir,
>>> List(config.getString("spark.jar")))
>>> sc.newAPIHadoopFile(logFile,classOf[LzoTextInputFormat],classOf[LongWritable],classOf[Text],
>>> new Job().getConfiguration()).map(line => line._2)
>>>
>>> Additionally I had to compile LZO native libraries, so keep that in mind.
>>>
>>>
>>> On Tue, Jan 21, 2014 at 6:57 AM, Nick Pentreath <[email protected]>
>>> wrote:
>>> Hi Vipul
>>>
>>> Have you tried looking at the HBase and Cassandra examples under the spark
>>> example project? These use custom InputFormats and may provide guidance as
>>> to how to go about using the relevant Protobuf inputformat.
>>>
>>>
>>>
>>>
>>> On Mon, Jan 20, 2014 at 11:48 PM, Vipul Pandey <[email protected]> wrote:
>>> Any suggestions, anyone?
>>> Core team / contributors / spark-developers - any thoughts?
>>>
>>> On Jan 17, 2014, at 4:45 PM, Vipul Pandey <[email protected]> wrote:
>>>
>>>> Hi All,
>>>>
>>>> Can someone please share (sample) code to read lzo compressed protobufs
>>>> from hdfs (using elephant bird)? I'm trying whatever I see in the forum
>>>> and on the web but it doesn't seem comprehensive to me.
>>>>
>>>> I'm using Spark0.8.0 . My pig scripts are able to read protobuf just fine
>>>> so the hadoop layer is setup alright. It will be really helpful if
>>>> someone can list out what needs to be done with/in spark.
>>>>
>>>> ~Vipul
>>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> --
>>> Issac Buenrostro
>>> Software Engineer |
>>> [email protected] | (617) 997-3350
>>> www.ooyala.com | blog | @ooyala
>>
>>
>>
>>
>> --
>> --
>> Issac Buenrostro
>> Software Engineer |
>> [email protected] | (617) 997-3350
>> www.ooyala.com | blog | @ooyala
>