Thanks for the advice!
What seems to work for is is that I define the array type as: type: {
type: array, items: string, java-class: java.util.ArrayList }It
seems to be creating an avro.Generic.List, which spark doesn't know how to
serialize, instead of a guava.util.List, which spark likes.
Hive at 0.13.1 still can't read it though...Thanks!-Mike
From: Michael Armbrust mich...@databricks.com
To: Michael Albert m_albert...@yahoo.com
Cc: user@spark.apache.org user@spark.apache.org
Sent: Tuesday, November 4, 2014 2:37 PM
Subject: Re: avro + parquet + vectorstring + NullPointerException while
reading
You might consider using the native parquet support built into Spark SQL
instead of using the raw library:
http://spark.apache.org/docs/latest/sql-programming-guide.html#parquet-files
On Mon, Nov 3, 2014 at 7:33 PM, Michael Albert m_albert...@yahoo.com.invalid
wrote:
Greetings!
I'm trying to use avro and parquet with the following schema:{ name:
TestStruct, namespace: bughunt, type: record, fields: [
{ name: string_array, type: { type: array,
items: string } } ]}The writing process seems to be OK, but when
I try to read it with Spark, I get:com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerExceptionSerialization trace:string_array
(bughunt.TestStruct) at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)When I try
to read it with Hive, I get this:Failed with exception
java.io.IOException:org.apache.hadoop.hive.ql.metadata.HiveException:
java.lang.ClassCastException: org.apache.hadoop.io.BytesWritable cannot be cast
to org.apache.hadoop.io.ArrayWritableWhich would lead me to suspect that this
might be related to this one: https://github.com/Parquet/parquet-mr/issues/281
, but that one seems to be Hive specific, and I am not seeing Spark read the
data it claims to have written itself.
I'm running on an Amazon EMR cluster using the version 2.4.0 hadoop code and
spark 1.1.0.Has anyone else observed this sort of behavior?
For completeness, here is the code that writes the data:package bughunt
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport
org.apache.spark.SparkContext._
import parquet.avro.AvroWriteSupportimport
parquet.avro.AvroParquetOutputFormatimport parquet.hadoop.ParquetOutputFormat
import java.util.ArrayList
object GenData { val outputPath = /user/x/testdata val words =
List( List(apple, banana, cherry),
List(car, boat, plane), List(lion, tiger, bear),
List(north, south, east, west),
List(up, down, left, right), List(red, green,
blue))
def main(args: Array[String]) { val conf = new SparkConf(true)
.setAppName(IngestLoanApplicattion)
//.set(spark.kryo.registrator, //
classOf[CommonRegistrator].getName) .set(spark.serializer,
org.apache.spark.serializer.KryoSerializer)
.set(spark.kryoserializer.buffer.mb, 4.toString)
.set(spark.kryo.referenceTracking, false)
val sc = new SparkContext(conf)
val rdd = sc.parallelize(words)
val job = new Job(sc.hadoopConfiguration)
ParquetOutputFormat.setWriteSupportClass(job,
classOf[AvroWriteSupport]) AvroParquetOutputFormat.setSchema(job,
TestStruct.SCHEMA$)
rdd.map(p = { val xs = new
java.util.ArrayList[String] for (z-p) { xs.add(z) }
val bldr = TestStruct.newBuilder()
bldr.setStringArray(xs) (null, bldr.build()) })
.saveAsNewAPIHadoopFile(outputPath, classOf[Void],
classOf[TestStruct], classOf[ParquetOutputFormat[TestStruct]],
job.getConfiguration) }}
To read the data, I use this sort of code from the spark-shell::paste
import bughunt.TestStruct
import org.apache.hadoop.mapreduce.Jobimport org.apache.spark.SparkContext
import parquet.hadoop.ParquetInputFormatimport parquet.avro.AvroReadSupport
def openRddSpecific(sc: SparkContext) = { val job = new
Job(sc.hadoopConfiguration)
ParquetInputFormat.setReadSupportClass(job,
classOf[AvroReadSupport[TestStruct]])
sc.newAPIHadoopFile(/user/malbert/testdata,
classOf[ParquetInputFormat[TestStruct]], classOf[Void],
classOf[TestStruct], job.getConfiguration)}I start the Spark shell
as follows:spark-shell \ --jars
../my-jar-containing-the-class