git commit: [SPARK-4192][SQL] Internal API for Python UDT

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/master c5912ecc7 - 04450d115


[SPARK-4192][SQL] Internal API for Python UDT

Following #2919, this PR adds Python UDT (for internal use only) with tests 
under pyspark.tests. Before `SQLContext.applySchema`, we check whether we 
need to convert user-type instances into SQL recognizable data. In the current 
implementation, a Python UDT must be paired with a Scala UDT for serialization 
on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and 
Python.

marmbrus jkbradley davies

Author: Xiangrui Meng m...@databricks.com

Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits:

acff637 [Xiangrui Meng] merge master
dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well
2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion
7c4a6a9 [Xiangrui Meng] address comments
75223db [Xiangrui Meng] minor update
f740379 [Xiangrui Meng] remove UDT from default imports
e98d9d0 [Xiangrui Meng] fix py style
4e84fce [Xiangrui Meng] remove local hive tests and add more tests
39f19e0 [Xiangrui Meng] add tests
b7f666d [Xiangrui Meng] add Python UDT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04450d11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04450d11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04450d11

Branch: refs/heads/master
Commit: 04450d11548cfb25d4fb77d4a33e3a7cd4254183
Parents: c5912ec
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 19:29:11 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 19:29:11 2014 -0800

--
 python/pyspark/sql.py   | 206 ++-
 python/pyspark/tests.py |  93 -
 .../spark/sql/catalyst/types/dataTypes.scala|   9 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +
 .../apache/spark/sql/execution/pythonUdfs.scala |   5 +
 .../apache/spark/sql/test/ExamplePointUDT.scala |  64 ++
 .../sql/types/util/DataTypeConversions.scala|   1 -
 7 files changed, 375 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/04450d11/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 675df08..d16c18b 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -417,6 +417,75 @@ class StructType(DataType):
 return StructType([StructField.fromJson(f) for f in json[fields]])
 
 
+class UserDefinedType(DataType):
+
+:: WARN: Spark Internal Use Only ::
+SQL User-Defined Type (UDT).
+
+
+@classmethod
+def typeName(cls):
+return cls.__name__.lower()
+
+@classmethod
+def sqlType(cls):
+
+Underlying SQL storage type for this UDT.
+
+raise NotImplementedError(UDT must implement sqlType().)
+
+@classmethod
+def module(cls):
+
+The Python module of the UDT.
+
+raise NotImplementedError(UDT must implement module().)
+
+@classmethod
+def scalaUDT(cls):
+
+The class name of the paired Scala UDT.
+
+raise NotImplementedError(UDT must have a paired Scala UDT.)
+
+def serialize(self, obj):
+
+Converts the a user-type object into a SQL datum.
+
+raise NotImplementedError(UDT must implement serialize().)
+
+def deserialize(self, datum):
+
+Converts a SQL datum into a user-type object.
+
+raise NotImplementedError(UDT must implement deserialize().)
+
+def json(self):
+return json.dumps(self.jsonValue(), separators=(',', ':'), 
sort_keys=True)
+
+def jsonValue(self):
+schema = {
+type: udt,
+class: self.scalaUDT(),
+pyClass: %s.%s % (self.module(), type(self).__name__),
+sqlType: self.sqlType().jsonValue()
+}
+return schema
+
+@classmethod
+def fromJson(cls, json):
+pyUDT = json[pyClass]
+split = pyUDT.rfind(.)
+pyModule = pyUDT[:split]
+pyClass = pyUDT[split+1:]
+m = __import__(pyModule, globals(), locals(), [pyClass], -1)
+UDT = getattr(m, pyClass)
+return UDT()
+
+def __eq__(self, other):
+return type(self) == type(other)
+
+
 _all_primitive_types = dict((v.typeName(), v)
 for v in globals().itervalues()
 if type(v) is PrimitiveTypeSingleton and
@@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string):
 ...   complex_arraytype, False)
  check_datatype(complex_maptype)
 True
+ check_datatype(ExamplePointUDT())
+   

git commit: [SPARK-4192][SQL] Internal API for Python UDT

2014-11-03 Thread meng
Repository: spark
Updated Branches:
  refs/heads/branch-1.2 0826eed9c - 42d02db86


[SPARK-4192][SQL] Internal API for Python UDT

Following #2919, this PR adds Python UDT (for internal use only) with tests 
under pyspark.tests. Before `SQLContext.applySchema`, we check whether we 
need to convert user-type instances into SQL recognizable data. In the current 
implementation, a Python UDT must be paired with a Scala UDT for serialization 
on the JVM side. A following PR will add VectorUDT in MLlib for both Scala and 
Python.

marmbrus jkbradley davies

Author: Xiangrui Meng m...@databricks.com

Closes #3068 from mengxr/SPARK-4192-sql and squashes the following commits:

acff637 [Xiangrui Meng] merge master
dba5ea7 [Xiangrui Meng] only use pyClass for Python UDT output sqlType as well
2c9d7e4 [Xiangrui Meng] move import to global setup; update needsConversion
7c4a6a9 [Xiangrui Meng] address comments
75223db [Xiangrui Meng] minor update
f740379 [Xiangrui Meng] remove UDT from default imports
e98d9d0 [Xiangrui Meng] fix py style
4e84fce [Xiangrui Meng] remove local hive tests and add more tests
39f19e0 [Xiangrui Meng] add tests
b7f666d [Xiangrui Meng] add Python UDT

(cherry picked from commit 04450d11548cfb25d4fb77d4a33e3a7cd4254183)
Signed-off-by: Xiangrui Meng m...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42d02db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42d02db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42d02db8

Branch: refs/heads/branch-1.2
Commit: 42d02db86cd973cf31ceeede0c5a723238bbe746
Parents: 0826eed
Author: Xiangrui Meng m...@databricks.com
Authored: Mon Nov 3 19:29:11 2014 -0800
Committer: Xiangrui Meng m...@databricks.com
Committed: Mon Nov 3 19:30:32 2014 -0800

--
 python/pyspark/sql.py   | 206 ++-
 python/pyspark/tests.py |  93 -
 .../spark/sql/catalyst/types/dataTypes.scala|   9 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |   2 +
 .../apache/spark/sql/execution/pythonUdfs.scala |   5 +
 .../apache/spark/sql/test/ExamplePointUDT.scala |  64 ++
 .../sql/types/util/DataTypeConversions.scala|   1 -
 7 files changed, 375 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/42d02db8/python/pyspark/sql.py
--
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 675df08..d16c18b 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -417,6 +417,75 @@ class StructType(DataType):
 return StructType([StructField.fromJson(f) for f in json[fields]])
 
 
+class UserDefinedType(DataType):
+
+:: WARN: Spark Internal Use Only ::
+SQL User-Defined Type (UDT).
+
+
+@classmethod
+def typeName(cls):
+return cls.__name__.lower()
+
+@classmethod
+def sqlType(cls):
+
+Underlying SQL storage type for this UDT.
+
+raise NotImplementedError(UDT must implement sqlType().)
+
+@classmethod
+def module(cls):
+
+The Python module of the UDT.
+
+raise NotImplementedError(UDT must implement module().)
+
+@classmethod
+def scalaUDT(cls):
+
+The class name of the paired Scala UDT.
+
+raise NotImplementedError(UDT must have a paired Scala UDT.)
+
+def serialize(self, obj):
+
+Converts the a user-type object into a SQL datum.
+
+raise NotImplementedError(UDT must implement serialize().)
+
+def deserialize(self, datum):
+
+Converts a SQL datum into a user-type object.
+
+raise NotImplementedError(UDT must implement deserialize().)
+
+def json(self):
+return json.dumps(self.jsonValue(), separators=(',', ':'), 
sort_keys=True)
+
+def jsonValue(self):
+schema = {
+type: udt,
+class: self.scalaUDT(),
+pyClass: %s.%s % (self.module(), type(self).__name__),
+sqlType: self.sqlType().jsonValue()
+}
+return schema
+
+@classmethod
+def fromJson(cls, json):
+pyUDT = json[pyClass]
+split = pyUDT.rfind(.)
+pyModule = pyUDT[:split]
+pyClass = pyUDT[split+1:]
+m = __import__(pyModule, globals(), locals(), [pyClass], -1)
+UDT = getattr(m, pyClass)
+return UDT()
+
+def __eq__(self, other):
+return type(self) == type(other)
+
+
 _all_primitive_types = dict((v.typeName(), v)
 for v in globals().itervalues()
 if type(v) is PrimitiveTypeSingleton and
@@ -469,6 +538,12 @@ def _parse_datatype_json_string(json_string):
 ...