Repository: spark Updated Branches: refs/heads/master 5b922bb45 -> 248232936
[SPARK-3389] Add Converter for ease of Parquet reading in PySpark https://issues.apache.org/jira/browse/SPARK-3389 Author: Uri Laserson <laser...@cloudera.com> Closes #2256 from laserson/SPARK-3389 and squashes the following commits: 0ed363e [Uri Laserson] PEP8'd the python file 0b4b380 [Uri Laserson] Moved converter to examples and added python example eecf4dc [Uri Laserson] [SPARK-3389] Add Converter for ease of Parquet reading in PySpark Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24823293 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24823293 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24823293 Branch: refs/heads/master Commit: 248232936e1bead7f102e59eb8faf3126c582d9d Parents: 5b922bb Author: Uri Laserson <laser...@cloudera.com> Authored: Sat Sep 27 21:48:05 2014 -0700 Committer: Matei Zaharia <ma...@databricks.com> Committed: Sat Sep 27 21:48:05 2014 -0700 ---------------------------------------------------------------------- examples/src/main/python/parquet_inputformat.py | 59 ++++++++++++++ examples/src/main/resources/full_user.avsc | 1 + examples/src/main/resources/users.parquet | Bin 0 -> 615 bytes .../pythonconverters/AvroConverters.scala | 76 +++++++++++-------- 4 files changed, 106 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/python/parquet_inputformat.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/parquet_inputformat.py b/examples/src/main/python/parquet_inputformat.py new file mode 100644 index 0000000..c9b08f8 --- /dev/null +++ b/examples/src/main/python/parquet_inputformat.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Read data file users.parquet in local Spark distro: + +$ cd $SPARK_HOME +$ export AVRO_PARQUET_JARS=/path/to/parquet-avro-1.5.0.jar +$ ./bin/spark-submit --driver-class-path /path/to/example/jar \\ + --jars $AVRO_PARQUET_JARS \\ + ./examples/src/main/python/parquet_inputformat.py \\ + examples/src/main/resources/users.parquet +<...lots of log output...> +{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]} +{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []} +<...more log output...> +""" +if __name__ == "__main__": + if len(sys.argv) != 2: + print >> sys.stderr, """ + Usage: parquet_inputformat.py <data_file> + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar \\ + /path/to/examples/parquet_inputformat.py <data_file> + Assumes you have Parquet data stored in <data_file>. + """ + exit(-1) + + path = sys.argv[1] + sc = SparkContext(appName="ParquetInputFormat") + + parquet_rdd = sc.newAPIHadoopFile( + path, + 'parquet.avro.AvroParquetInputFormat', + 'java.lang.Void', + 'org.apache.avro.generic.IndexedRecord', + valueConverter='org.apache.spark.examples.pythonconverters.IndexedRecordToJavaConverter') + output = parquet_rdd.map(lambda x: x[1]).collect() + for k in output: + print k http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/resources/full_user.avsc ---------------------------------------------------------------------- diff --git a/examples/src/main/resources/full_user.avsc b/examples/src/main/resources/full_user.avsc new file mode 100644 index 0000000..04e7ba2 --- /dev/null +++ b/examples/src/main/resources/full_user.avsc @@ -0,0 +1 @@ +{"type": "record", "namespace": "example.avro", "name": "User", "fields": [{"type": "string", "name": "name"}, {"type": ["string", "null"], "name": "favorite_color"}, {"type": {"items": "int", "type": "array"}, "name": "favorite_numbers"}]} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/resources/users.parquet ---------------------------------------------------------------------- diff --git a/examples/src/main/resources/users.parquet b/examples/src/main/resources/users.parquet new file mode 100644 index 0000000..aa52733 Binary files /dev/null and b/examples/src/main/resources/users.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/24823293/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala index 1b25983..a11890d 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala @@ -30,21 +30,28 @@ import org.apache.spark.api.python.Converter import org.apache.spark.SparkException -/** - * Implementation of [[org.apache.spark.api.python.Converter]] that converts - * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries - * to work with all 3 Avro data mappings (Generic, Specific and Reflect). - */ -class AvroWrapperToJavaConverter extends Converter[Any, Any] { - override def convert(obj: Any): Any = { +object AvroConversionUtil extends Serializable { + def fromAvro(obj: Any, schema: Schema): Any = { if (obj == null) { return null } - obj.asInstanceOf[AvroWrapper[_]].datum() match { - case null => null - case record: IndexedRecord => unpackRecord(record) - case other => throw new SparkException( - s"Unsupported top-level Avro data type ${other.getClass.getName}") + schema.getType match { + case UNION => unpackUnion(obj, schema) + case ARRAY => unpackArray(obj, schema) + case FIXED => unpackFixed(obj, schema) + case MAP => unpackMap(obj, schema) + case BYTES => unpackBytes(obj) + case RECORD => unpackRecord(obj) + case STRING => obj.toString + case ENUM => obj.toString + case NULL => obj + case BOOLEAN => obj + case DOUBLE => obj + case FLOAT => obj + case INT => obj + case LONG => obj + case other => throw new SparkException( + s"Unknown Avro schema type ${other.getName}") } } @@ -103,28 +110,37 @@ class AvroWrapperToJavaConverter extends Converter[Any, Any] { "Unions may only consist of a concrete type and null") } } +} - def fromAvro(obj: Any, schema: Schema): Any = { +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts + * an Avro IndexedRecord (e.g., derived from AvroParquetInputFormat) to a Java Map. + */ +class IndexedRecordToJavaConverter extends Converter[IndexedRecord, JMap[String, Any]]{ + override def convert(record: IndexedRecord): JMap[String, Any] = { + if (record == null) { + return null + } + val map = new java.util.HashMap[String, Any] + AvroConversionUtil.unpackRecord(record) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts + * an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries + * to work with all 3 Avro data mappings (Generic, Specific and Reflect). + */ +class AvroWrapperToJavaConverter extends Converter[Any, Any] { + override def convert(obj: Any): Any = { if (obj == null) { return null } - schema.getType match { - case UNION => unpackUnion(obj, schema) - case ARRAY => unpackArray(obj, schema) - case FIXED => unpackFixed(obj, schema) - case MAP => unpackMap(obj, schema) - case BYTES => unpackBytes(obj) - case RECORD => unpackRecord(obj) - case STRING => obj.toString - case ENUM => obj.toString - case NULL => obj - case BOOLEAN => obj - case DOUBLE => obj - case FLOAT => obj - case INT => obj - case LONG => obj - case other => throw new SparkException( - s"Unknown Avro schema type ${other.getName}") + obj.asInstanceOf[AvroWrapper[_]].datum() match { + case null => null + case record: IndexedRecord => AvroConversionUtil.unpackRecord(record) + case other => throw new SparkException( + s"Unsupported top-level Avro data type ${other.getClass.getName}") } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org