Github user ericgarcia commented on the pull request:
https://github.com/apache/spark/pull/455#issuecomment-49681825
@rjurney Thanks for your question on Avro and pyspark. @MLnick I used your
code as a starting point to successfully read a few different Avro data types
in pyspark instead of merely a String. This could be added in with
HBaseConverter.scala and CassandraConverters.scala
package org.apache.spark.examples.pythonconverters
import org.apache.spark.api.python.Converter
import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import collection.JavaConversions._
import org.apache.avro.Schema.Field
import org.apache.avro.Schema
import org.apache.avro.Schema.Type._
class AvroGenericConverter extends Converter[AvroKey[GenericRecord],
java.util.Map[String, Any]] {
override def convert(obj: AvroKey[GenericRecord]):
java.util.Map[String, Any] = {
val record = obj.datum()
def unpackRecord(record: GenericRecord): java.util.Map[String,Any]
= {
mapAsJavaMap(record.getSchema.getFields.map( f => (f.name,
unpack(record.get(f.name), f.schema) ) ).toMap)
}
def unpackArray(value: Any, schema: Schema): java.util.List[Any] = {
bufferAsJavaList(value.asInstanceOf[java.util.List[Any]].map( v
=> unpack(v, schema)))
}
def unpack(value: Any, schema: Schema): Any = schema.getType match {
case STRING => value.asInstanceOf[java.lang.String]
case ENUM => value.toString
case LONG => value.asInstanceOf[java.lang.Long]
case INT => value.asInstanceOf[java.lang.Integer]
case ARRAY => unpackArray(value, schema.getElementType)
case RECORD => unpackRecord(value.asInstanceOf[GenericRecord])
case _ => value.toString
}
unpackRecord(record)
}
}
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---