git commit: [SPARK-4192][SQL] Internal API for Python UDT
Repository: spark Updated Branches: refs/heads/master c5912ecc7 - 04450d115 [SPARK-4192][SQL] Internal API for Python UDT Following #2919, this PR adds Python UDT (for internal use only) with tests under pyspark.tests. Before `SQLContext.applySchema`, we check whether we need to convert user-type instances into SQL recognizable data. In the current implementation, a Python UDT must be paired with a Scala UDT for serialization on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and Python. marmbrus jkbradley davies Author: Xiangrui Meng m...@databricks.com Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits: acff637 [Xiangrui Meng] merge master dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well 2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion 7c4a6a9 [Xiangrui Meng] address comments 75223db [Xiangrui Meng] minor update f740379 [Xiangrui Meng] remove UDT from default imports e98d9d0 [Xiangrui Meng] fix py style 4e84fce [Xiangrui Meng] remove local hive tests and add more tests 39f19e0 [Xiangrui Meng] add tests b7f666d [Xiangrui Meng] add Python UDT Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04450d11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04450d11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04450d11 Branch: refs/heads/master Commit: 04450d11548cfb25d4fb77d4a33e3a7cd4254183 Parents: c5912ec Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 19:29:11 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 19:29:11 2014 -0800 -- python/pyspark/sql.py | 206 ++- python/pyspark/tests.py | 93 - .../spark/sql/catalyst/types/dataTypes.scala| 9 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 + .../apache/spark/sql/execution/pythonUdfs.scala | 5 + .../apache/spark/sql/test/ExamplePointUDT.scala | 64 ++ .../sql/types/util/DataTypeConversions.scala| 1 - 7 files changed, 375 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/04450d11/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 675df08..d16c18b 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -417,6 +417,75 @@ class StructType(DataType): return StructType([StructField.fromJson(f) for f in json[fields]]) +class UserDefinedType(DataType): + +:: WARN: Spark Internal Use Only :: +SQL User-Defined Type (UDT). + + +@classmethod +def typeName(cls): +return cls.__name__.lower() + +@classmethod +def sqlType(cls): + +Underlying SQL storage type for this UDT. + +raise NotImplementedError(UDT must implement sqlType().) + +@classmethod +def module(cls): + +The Python module of the UDT. + +raise NotImplementedError(UDT must implement module().) + +@classmethod +def scalaUDT(cls): + +The class name of the paired Scala UDT. + +raise NotImplementedError(UDT must have a paired Scala UDT.) + +def serialize(self, obj): + +Converts the a user-type object into a SQL datum. + +raise NotImplementedError(UDT must implement serialize().) + +def deserialize(self, datum): + +Converts a SQL datum into a user-type object. + +raise NotImplementedError(UDT must implement deserialize().) + +def json(self): +return json.dumps(self.jsonValue(), separators=(',', ':'), sort_keys=True) + +def jsonValue(self): +schema = { +type: udt, +class: self.scalaUDT(), +pyClass: %s.%s % (self.module(), type(self).__name__), +sqlType: self.sqlType().jsonValue() +} +return schema + +@classmethod +def fromJson(cls, json): +pyUDT = json[pyClass] +split = pyUDT.rfind(.) +pyModule = pyUDT[:split] +pyClass = pyUDT[split+1:] +m = __import__(pyModule, globals(), locals(), [pyClass], -1) +UDT = getattr(m, pyClass) +return UDT() + +def __eq__(self, other): +return type(self) == type(other) + + _all_primitive_types = dict((v.typeName(), v) for v in globals().itervalues() if type(v) is PrimitiveTypeSingleton and @@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string): ... complex_arraytype, False) check_datatype(complex_maptype) True + check_datatype(ExamplePointUDT()) +
git commit: [SPARK-4192][SQL] Internal API for Python UDT
Repository: spark Updated Branches: refs/heads/branch-1.2 0826eed9c - 42d02db86 [SPARK-4192][SQL] Internal API for Python UDT Following #2919, this PR adds Python UDT (for internal use only) with tests under pyspark.tests. Before `SQLContext.applySchema`, we check whether we need to convert user-type instances into SQL recognizable data. In the current implementation, a Python UDT must be paired with a Scala UDT for serialization on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and Python. marmbrus jkbradley davies Author: Xiangrui Meng m...@databricks.com Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits: acff637 [Xiangrui Meng] merge master dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well 2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion 7c4a6a9 [Xiangrui Meng] address comments 75223db [Xiangrui Meng] minor update f740379 [Xiangrui Meng] remove UDT from default imports e98d9d0 [Xiangrui Meng] fix py style 4e84fce [Xiangrui Meng] remove local hive tests and add more tests 39f19e0 [Xiangrui Meng] add tests b7f666d [Xiangrui Meng] add Python UDT (cherry picked from commit 04450d11548cfb25d4fb77d4a33e3a7cd4254183) Signed-off-by: Xiangrui Meng m...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d02db8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d02db8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d02db8 Branch: refs/heads/branch-1.2 Commit: 42d02db86cd973cf31ceeede0c5a723238bbe746 Parents: 0826eed Author: Xiangrui Meng m...@databricks.com Authored: Mon Nov 3 19:29:11 2014 -0800 Committer: Xiangrui Meng m...@databricks.com Committed: Mon Nov 3 19:30:32 2014 -0800 -- python/pyspark/sql.py | 206 ++- python/pyspark/tests.py | 93 - .../spark/sql/catalyst/types/dataTypes.scala| 9 +- .../scala/org/apache/spark/sql/SQLContext.scala | 2 + .../apache/spark/sql/execution/pythonUdfs.scala | 5 + .../apache/spark/sql/test/ExamplePointUDT.scala | 64 ++ .../sql/types/util/DataTypeConversions.scala| 1 - 7 files changed, 375 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42d02db8/python/pyspark/sql.py -- diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 675df08..d16c18b 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -417,6 +417,75 @@ class StructType(DataType): return StructType([StructField.fromJson(f) for f in json[fields]]) +class UserDefinedType(DataType): + +:: WARN: Spark Internal Use Only :: +SQL User-Defined Type (UDT). + + +@classmethod +def typeName(cls): +return cls.__name__.lower() + +@classmethod +def sqlType(cls): + +Underlying SQL storage type for this UDT. + +raise NotImplementedError(UDT must implement sqlType().) + +@classmethod +def module(cls): + +The Python module of the UDT. + +raise NotImplementedError(UDT must implement module().) + +@classmethod +def scalaUDT(cls): + +The class name of the paired Scala UDT. + +raise NotImplementedError(UDT must have a paired Scala UDT.) + +def serialize(self, obj): + +Converts the a user-type object into a SQL datum. + +raise NotImplementedError(UDT must implement serialize().) + +def deserialize(self, datum): + +Converts a SQL datum into a user-type object. + +raise NotImplementedError(UDT must implement deserialize().) + +def json(self): +return json.dumps(self.jsonValue(), separators=(',', ':'), sort_keys=True) + +def jsonValue(self): +schema = { +type: udt, +class: self.scalaUDT(), +pyClass: %s.%s % (self.module(), type(self).__name__), +sqlType: self.sqlType().jsonValue() +} +return schema + +@classmethod +def fromJson(cls, json): +pyUDT = json[pyClass] +split = pyUDT.rfind(.) +pyModule = pyUDT[:split] +pyClass = pyUDT[split+1:] +m = __import__(pyModule, globals(), locals(), [pyClass], -1) +UDT = getattr(m, pyClass) +return UDT() + +def __eq__(self, other): +return type(self) == type(other) + + _all_primitive_types = dict((v.typeName(), v) for v in globals().itervalues() if type(v) is PrimitiveTypeSingleton and @@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string): ...