Re: avro + parquet + vectorstring + NullPointerException while reading

2014-11-06 Thread Michael Albert
Thanks for the advice!
What seems to work for is is that I define the array type as:   type: { 
type: array, items: string, java-class: java.util.ArrayList }It 
seems to be creating an avro.Generic.List, which spark doesn't know how to 
serialize, instead of a guava.util.List, which spark likes.
Hive at 0.13.1 still can't read it though...Thanks!-Mike

  From: Michael Armbrust mich...@databricks.com
 To: Michael Albert m_albert...@yahoo.com 
Cc: user@spark.apache.org user@spark.apache.org 
 Sent: Tuesday, November 4, 2014 2:37 PM
 Subject: Re: avro + parquet + vectorstring + NullPointerException while 
reading
   
You might consider using the native parquet support built into Spark SQL 
instead of using the raw library: 
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files



On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert m_albert...@yahoo.com.invalid 
wrote:

Greetings!
I'm trying to use avro and parquet with the following schema:{    name: 
TestStruct,    namespace: bughunt,    type: record,    fields: [    
    {            name: string_array,            type: { type: array, 
items: string }         }    ]}The writing process seems to be OK, but when 
I try to read it with Spark, I get:com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerExceptionSerialization trace:string_array 
(bughunt.TestStruct) at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)When I try 
to read it with Hive, I get this:Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast 
to org.apache.hadoop.io.ArrayWritableWhich would lead me to suspect that this 
might be related to this one: https://github.com/Parquet/parquet-mr/issues/281 
, but that one seems to be Hive specific, and I am not seeing Spark read the 
data it claims to have written itself.
I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and 
spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:package bughunt
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._

import parquet.avro.AvroWriteSupportimport 
parquet.avro.AvroParquetOutputFormatimport parquet.hadoop.ParquetOutputFormat
import java.util.ArrayList

object GenData {    val outputPath = /user/x/testdata    val words = 
List(                     List(apple, banana, cherry),                    
List(car, boat, plane),                    List(lion, tiger, bear), 
                   List(north, south, east, west),                    
List(up, down, left, right),                    List(red, green, 
blue))
    def main(args: Array[String]) {        val conf = new SparkConf(true)       
             .setAppName(IngestLoanApplicattion)                    
//.set(spark.kryo.registrator,                    //            
classOf[CommonRegistrator].getName)                    .set(spark.serializer, 
                           org.apache.spark.serializer.KryoSerializer)        
            .set(spark.kryoserializer.buffer.mb, 4.toString)                  
  .set(spark.kryo.referenceTracking, false)
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(words)
        val job = new Job(sc.hadoopConfiguration)
        ParquetOutputFormat.setWriteSupportClass(job, 
classOf[AvroWriteSupport])        AvroParquetOutputFormat.setSchema(job,        
            TestStruct.SCHEMA$)
        rdd.map(p = {                     val xs = new 
java.util.ArrayList[String]                    for (z-p) { xs.add(z) }         
           val bldr = TestStruct.newBuilder()                    
bldr.setStringArray(xs)                    (null, bldr.build()) })           
.saveAsNewAPIHadoopFile(outputPath,                classOf[Void],               
 classOf[TestStruct],                classOf[ParquetOutputFormat[TestStruct]],  
              job.getConfiguration)    }}
To read the data, I use this sort of code from the spark-shell::paste
import bughunt.TestStruct
import org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContext
import parquet.hadoop.ParquetInputFormatimport parquet.avro.AvroReadSupport
def openRddSpecific(sc: SparkContext) = {    val job = new 
Job(sc.hadoopConfiguration)
    ParquetInputFormat.setReadSupportClass(job,            
classOf[AvroReadSupport[TestStruct]])
    sc.newAPIHadoopFile(/user/malbert/testdata,            
classOf[ParquetInputFormat[TestStruct]],            classOf[Void],            
classOf[TestStruct],            job.getConfiguration)}I start the Spark shell 
as follows:spark-shell \    --jars 
../my-jar-containing-the-class

avro + parquet + vectorstring + NullPointerException while reading

2014-11-03 Thread Michael Albert

Greetings!




I'm trying to use avro and parquet with the following schema:

{

    name: TestStruct,

    namespace: bughunt,

    type: record,

    fields: [

        {

            name: string_array,

            type: { type: array, items: string } 

        }

    ]



}
The writing process seems to be OK, but when I try to read it with Spark, I get:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException

Serialization trace:

string_array (bughunt.TestStruct)

 at 
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)

 at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)

 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
When I try to read it with Hive, I get this:
Failed with exception 
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException: 
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast 
to org.apache.hadoop.io.ArrayWritable
Which would lead me to suspect that this might be related to this one: 
https://github.com/Parquet/parquet-mr/issues/281 , but that one seems to be 
Hive specific, and I am not seeing Spark read the data it claims to have 
written itself.
I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and 
spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:
package bughunt




import org.apache.hadoop.mapreduce.Job




import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._







import parquet.avro.AvroWriteSupport

import parquet.avro.AvroParquetOutputFormat

import parquet.hadoop.ParquetOutputFormat




import java.util.ArrayList







object GenData {

    val outputPath = /user/x/testdata

    val words = List( 

                    List(apple, banana, cherry),

                    List(car, boat, plane),

                    List(lion, tiger, bear),

                    List(north, south, east, west),

                    List(up, down, left, right),

                    List(red, green, blue))




    def main(args: Array[String]) {

        val conf = new SparkConf(true)

                    .setAppName(IngestLoanApplicattion)

                    //.set(spark.kryo.registrator,

                    //            classOf[CommonRegistrator].getName)

                    .set(spark.serializer,

                            org.apache.spark.serializer.KryoSerializer)

                    .set(spark.kryoserializer.buffer.mb, 4.toString)

                    .set(spark.kryo.referenceTracking, false)




        val sc = new SparkContext(conf)




        val rdd = sc.parallelize(words)




        val job = new Job(sc.hadoopConfiguration)




        ParquetOutputFormat.setWriteSupportClass(job, classOf[AvroWriteSupport])

        AvroParquetOutputFormat.setSchema(job,

                    TestStruct.SCHEMA$)




        rdd.map(p = { 

                    val xs = new java.util.ArrayList[String]

                    for (z-p) { xs.add(z) }

                    val bldr = TestStruct.newBuilder()

                    bldr.setStringArray(xs)

                    (null, bldr.build()) })

           .saveAsNewAPIHadoopFile(outputPath,

                classOf[Void],

                classOf[TestStruct],

                classOf[ParquetOutputFormat[TestStruct]],

                job.getConfiguration)

    }

}

To read the data, I use this sort of code from the spark-shell:
:paste




import bughunt.TestStruct




import org.apache.hadoop.mapreduce.Job

import org.apache.spark.SparkContext




import parquet.hadoop.ParquetInputFormat

import parquet.avro.AvroReadSupport




def openRddSpecific(sc: SparkContext) = {

    val job = new Job(sc.hadoopConfiguration)




    ParquetInputFormat.setReadSupportClass(job,

            classOf[AvroReadSupport[TestStruct]])




    sc.newAPIHadoopFile(/user/malbert/testdata,

            classOf[ParquetInputFormat[TestStruct]],

            classOf[Void],

            classOf[TestStruct],

            job.getConfiguration)

}
I start the Spark shell as follows:
spark-shell \

    --jars ../my-jar-containing-the-class-definitions.jar \

    --conf mapreduce.user.classpath.first=true \

    --conf spark.kryo.referenceTracking=false \

    --conf spark.kryoserializer.buffer.mb=4 \

    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 

I'm stumped.  I can read and write records and maps, but arrays/vectors elude 
me.Am I missing something obvious?
Thanks!
Sincerely, Mike Albert