Github user ericgarcia commented on the pull request:

    https://github.com/apache/spark/pull/455#issuecomment-49681825
  
    @rjurney Thanks for your question on Avro and pyspark. @MLnick I used your 
code as a starting point to successfully read a few different Avro data types 
in pyspark instead of merely a String. This could be added in with 
HBaseConverter.scala and CassandraConverters.scala
    
        package org.apache.spark.examples.pythonconverters
    
        import org.apache.spark.api.python.Converter
        import org.apache.avro.generic.GenericRecord
        import org.apache.avro.mapred.AvroKey
        import org.apache.avro.mapreduce.AvroKeyInputFormat
        import collection.JavaConversions._
        import org.apache.avro.Schema.Field
        import org.apache.avro.Schema
        import org.apache.avro.Schema.Type._
    
        class AvroGenericConverter extends Converter[AvroKey[GenericRecord], 
java.util.Map[String, Any]] {
          override def convert(obj: AvroKey[GenericRecord]): 
java.util.Map[String, Any] = {
            val record = obj.datum()
            
            def unpackRecord(record: GenericRecord): java.util.Map[String,Any] 
= {
              mapAsJavaMap(record.getSchema.getFields.map( f => (f.name, 
unpack(record.get(f.name), f.schema) ) ).toMap)
            }
            
            def unpackArray(value: Any, schema: Schema): java.util.List[Any] = {
              bufferAsJavaList(value.asInstanceOf[java.util.List[Any]].map( v 
=> unpack(v, schema)))
            }
    
            def unpack(value: Any, schema: Schema): Any = schema.getType match {
              case STRING => value.asInstanceOf[java.lang.String]
              case ENUM => value.toString
              case LONG => value.asInstanceOf[java.lang.Long]
              case INT => value.asInstanceOf[java.lang.Integer]
              case ARRAY => unpackArray(value, schema.getElementType)
              case RECORD => unpackRecord(value.asInstanceOf[GenericRecord])
              case _ => value.toString
            }
    
            unpackRecord(record)
          }
        }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to