Repository: spark
Updated Branches:
  refs/heads/branch-1.1 f99e4fc80 -> 72e730e98


[SPARK-2736] PySpark converter and example script for reading Avro files

JIRA: https://issues.apache.org/jira/browse/SPARK-2736

This patch includes:
1. An Avro converter that converts Avro data types to Python. It handles all 3 
Avro data mappings (Generic, Specific and Reflect).
2. An example Python script for reading Avro files using AvroKeyInputFormat and 
the converter.
3. Fixing a classloading issue.

cc @MLnick @JoshRosen @mateiz

Author: Kan Zhang <kzh...@apache.org>

Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits:

02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes
f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className
82cc505 [Kan Zhang] [SPARK-2736] Update data sample
0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files
c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models
2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro 
classes
536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter

(cherry picked from commit 9422a9b084e3fd5b2b9be2752013588adfb430d0)
Signed-off-by: Matei Zaharia <ma...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e730e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e730e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e730e9

Branch: refs/heads/branch-1.1
Commit: 72e730e9828bb3d88c69a36a241c2e332fca5629
Parents: f99e4fc
Author: Kan Zhang <kzh...@apache.org>
Authored: Thu Aug 14 19:03:51 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Aug 14 19:04:06 2014 -0700

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 .../spark/api/python/PythonHadoopUtil.scala     |   3 +-
 .../org/apache/spark/api/python/PythonRDD.scala |  24 ++--
 .../scala/org/apache/spark/util/Utils.scala     |   3 +
 examples/src/main/python/avro_inputformat.py    |  75 +++++++++++
 examples/src/main/resources/user.avsc           |   8 ++
 examples/src/main/resources/users.avro          | Bin 0 -> 334 bytes
 .../pythonconverters/AvroConverters.scala       | 130 +++++++++++++++++++
 8 files changed, 231 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index bccb043..eaefef1 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -25,6 +25,7 @@ log4j-defaults.properties
 bootstrap-tooltip.js
 jquery-1.11.1.min.js
 sorttable.js
+.*avsc
 .*txt
 .*json
 .*data

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
index f3b05e1..49dc95f 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala
@@ -19,6 +19,7 @@ package org.apache.spark.api.python
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
 import org.apache.spark.{Logging, SerializableWritable, SparkException}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io._
@@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
                   defaultConverter: Converter[Any, Any]): Converter[Any, Any] 
= {
     converterClass.map { cc =>
       Try {
-        val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, 
Any]]
+        val c = 
Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
         logInfo(s"Loaded converter: $cc")
         c
       } match {

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index fefe1cb..9f5c5bd 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
       batchSize: Int) = {
     val keyClass = 
Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
     val valueClass = 
Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
-    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
-    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
+    val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+    val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
     val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
     val confBroadcasted = sc.sc.broadcast(new 
SerializableWritable(sc.hadoopConfiguration()))
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
@@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
-    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
-    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+    val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+    val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
     if (path.isDefined) {
       sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
     } else {
@@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
       keyClass: String,
       valueClass: String,
       conf: Configuration) = {
-    val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
-    val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
-    val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
+    val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
+    val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
+    val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
     if (path.isDefined) {
       sc.sc.hadoopFile(path.get, fc, kc, vc)
     } else {
@@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
     for {
       k <- Option(keyClass)
       v <- Option(valueClass)
-    } yield (Class.forName(k), Class.forName(v))
+    } yield (Utils.classForName(k), Utils.classForName(v))
   }
 
   private def getKeyValueConverters(keyConverterClass: String, 
valueConverterClass: String,
@@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
     val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
       inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
     val mergedConf = getMergedConf(confAsMap, 
pyRDD.context.hadoopConfiguration)
-    val codec = 
Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
+    val codec = 
Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new JavaToWritableConverter)
-    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
     converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), 
codec=codec)
   }
 
@@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
     val mergedConf = getMergedConf(confAsMap, 
pyRDD.context.hadoopConfiguration)
     val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
       new JavaToWritableConverter)
-    val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
+    val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
     converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8cac5da..019f68b 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -146,6 +146,9 @@ private[spark] object Utils extends Logging {
     Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
   }
 
+  /** Preferred alternative to Class.forName(className) */
+  def classForName(className: String) = Class.forName(className, true, 
getContextOrSparkClassLoader)
+
   /**
    * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link 
java.io.DataOutput}.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/examples/src/main/python/avro_inputformat.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/avro_inputformat.py 
b/examples/src/main/python/avro_inputformat.py
new file mode 100644
index 0000000..e902ae2
--- /dev/null
+++ b/examples/src/main/python/avro_inputformat.py
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+
+from pyspark import SparkContext
+
+"""
+Read data file users.avro in local Spark distro:
+
+$ cd $SPARK_HOME
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar 
./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro
+{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 
20]}
+{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
+
+To read name and favorite_color fields only, specify the following reader 
schema:
+
+$ cat examples/src/main/resources/user.avsc
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
+
+$ ./bin/spark-submit --driver-class-path /path/to/example/jar 
./examples/src/main/python/avro_inputformat.py \
+> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
+{u'favorite_color': None, u'name': u'Alyssa'}
+{u'favorite_color': u'red', u'name': u'Ben'}
+"""
+if __name__ == "__main__":
+    if len(sys.argv) != 2 and len(sys.argv) != 3:
+        print >> sys.stderr, """
+        Usage: avro_inputformat <data_file> [reader_schema_file]
+
+        Run with example jar:
+        ./bin/spark-submit --driver-class-path /path/to/example/jar 
/path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
+        Assumes you have Avro data stored in <data_file>. Reader schema can be 
optionally specified in [reader_schema_file].
+        """
+        exit(-1)
+
+    path = sys.argv[1]
+    sc = SparkContext(appName="AvroKeyInputFormat")
+
+    conf = None
+    if len(sys.argv) == 3:
+        schema_rdd = sc.textFile(sys.argv[2], 1).collect()
+        conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}
+
+    avro_rdd = sc.newAPIHadoopFile(path,
+        "org.apache.avro.mapreduce.AvroKeyInputFormat",
+        "org.apache.avro.mapred.AvroKey",
+        "org.apache.hadoop.io.NullWritable",
+        
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
+        conf=conf)
+    output = avro_rdd.map(lambda x: x[0]).collect()
+    for k in output:
+        print k

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/examples/src/main/resources/user.avsc
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/user.avsc 
b/examples/src/main/resources/user.avsc
new file mode 100644
index 0000000..4995357
--- /dev/null
+++ b/examples/src/main/resources/user.avsc
@@ -0,0 +1,8 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/examples/src/main/resources/users.avro
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/users.avro 
b/examples/src/main/resources/users.avro
new file mode 100644
index 0000000..27c526a
Binary files /dev/null and b/examples/src/main/resources/users.avro differ

http://git-wip-us.apache.org/repos/asf/spark/blob/72e730e9/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
new file mode 100644
index 0000000..1b25983
--- /dev/null
+++ 
b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.pythonconverters
+
+import java.util.{Collection => JCollection, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.avro.generic.{GenericFixed, IndexedRecord}
+import org.apache.avro.mapred.AvroWrapper
+import org.apache.avro.Schema
+import org.apache.avro.Schema.Type._
+
+import org.apache.spark.api.python.Converter
+import org.apache.spark.SparkException
+
+
+/**
+ * Implementation of [[org.apache.spark.api.python.Converter]] that converts
+ * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
+ * to work with all 3 Avro data mappings (Generic, Specific and Reflect).
+ */
+class AvroWrapperToJavaConverter extends Converter[Any, Any] {
+  override def convert(obj: Any): Any = {
+    if (obj == null) {
+      return null
+    }
+    obj.asInstanceOf[AvroWrapper[_]].datum() match {
+      case null => null
+      case record: IndexedRecord => unpackRecord(record)
+      case other => throw new SparkException(
+        s"Unsupported top-level Avro data type ${other.getClass.getName}")
+    }
+  }
+
+  def unpackRecord(obj: Any): JMap[String, Any] = {
+    val map = new java.util.HashMap[String, Any]
+    obj match {
+      case record: IndexedRecord =>
+        record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
+          map.put(f.name, fromAvro(record.get(i), f.schema))
+        }
+      case other => throw new SparkException(
+        s"Unsupported RECORD type ${other.getClass.getName}")
+    }
+    map
+  }
+
+  def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
+    obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
+      (key.toString, fromAvro(value, schema.getValueType))
+    }
+  }
+
+  def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
+    unpackBytes(obj.asInstanceOf[GenericFixed].bytes())
+  }
+
+  def unpackBytes(obj: Any): Array[Byte] = {
+    val bytes: Array[Byte] = obj match {
+      case buf: java.nio.ByteBuffer => buf.array()
+      case arr: Array[Byte] => arr
+      case other => throw new SparkException(
+        s"Unknown BYTES type ${other.getClass.getName}")
+    }
+    val bytearray = new Array[Byte](bytes.length)
+    System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
+    bytearray
+  }
+
+  def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
+    case c: JCollection[_] =>
+      c.map(fromAvro(_, schema.getElementType))
+    case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
+      arr.toSeq
+    case arr: Array[_] =>
+      arr.map(fromAvro(_, schema.getElementType)).toSeq
+    case other => throw new SparkException(
+      s"Unknown ARRAY type ${other.getClass.getName}")
+  }
+
+  def unpackUnion(obj: Any, schema: Schema): Any = {
+    schema.getTypes.toList match {
+      case List(s) => fromAvro(obj, s)
+      case List(n, s) if n.getType == NULL => fromAvro(obj, s)
+      case List(s, n) if n.getType == NULL => fromAvro(obj, s)
+      case _ => throw new SparkException(
+        "Unions may only consist of a concrete type and null")
+    }
+  }
+
+  def fromAvro(obj: Any, schema: Schema): Any = {
+    if (obj == null) {
+      return null
+    }
+    schema.getType match {
+      case UNION   => unpackUnion(obj, schema)
+      case ARRAY   => unpackArray(obj, schema)
+      case FIXED   => unpackFixed(obj, schema)
+      case MAP     => unpackMap(obj, schema)
+      case BYTES   => unpackBytes(obj)
+      case RECORD  => unpackRecord(obj)
+      case STRING  => obj.toString
+      case ENUM    => obj.toString
+      case NULL    => obj
+      case BOOLEAN => obj
+      case DOUBLE  => obj
+      case FLOAT   => obj
+      case INT     => obj
+      case LONG    => obj
+      case other   => throw new SparkException(
+        s"Unknown Avro schema type ${other.getName}")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to