Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1598#discussion_r15620424
--- Diff: python/pyspark/sql.py ---
@@ -467,6 +380,108 @@ def _parse_datatype_string(datatype_string):
return StructType(fields)
+
+_cached_namedtuples = {}
+
+def _restore_object(name, fields, obj):
+ """ Restore namedtuple object during unpickling. """
+ cls = _cached_namedtuples.get(fields)
+ if cls is None:
+ cls = namedtuple(name, fields)
+ def __reduce__(self):
+ return (_restore_object, (name, fields, tuple(self)))
+ cls.__reduce__ = __reduce__
+ _cached_namedtuples[fields] = cls
+ return cls(*obj)
+
+def _create_object(cls, v):
+ """ Create an customized object with class `cls`. """
+ return cls(v) if v is not None else v
+
+def _create_getter(dt, i):
+ """ Create a getter for item `i` with schema """
+ cls = _create_cls(dt)
+ def getter(self):
+ return _create_object(cls, self[i])
+ return getter
+
+def _has_struct(dt):
+ """Return whether `dt` is or has StructType in it"""
+ if isinstance(dt, StructType):
+ return True
+ elif isinstance(dt, ArrayType):
+ return _has_struct(dt.elementType)
+ elif isinstance(dt, MapType):
+ return _has_struct(dt.valueType)
+ return False
+
+def _create_cls(dataType):
+ """
+ Create an class by dataType
+
+ The created class is similar to namedtuple, but can have nested schema.
+ """
+ from operator import itemgetter
+
+ if isinstance(dataType, ArrayType):
+ cls = _create_cls(dataType.elementType)
+ class List(list):
+ def __getitem__(self, i):
+ # create object with datetype
+ return _create_object(cls, list.__getitem__(self, i))
+ def __repr__(self):
+ # call collect __repr__ for nested objects
+ return "[%s]" % (", ".join(repr(self[i])
+ for i in range(len(self))))
+ def __reduce__(self):
+ # pickle as dict, the nested struct can be reduced by
itself
+ return (list, (list(self),))
+ return List
+
+ elif isinstance(dataType, MapType):
+ vcls = _create_cls(dataType.valueType)
+ class Dict(dict):
+ def __getitem__(self, k):
+ # create object with datetype
+ return _create_object(vcls, dict.__getitem__(self, k))
+ def __repr__(self):
+ # call collect __repr__ for nested objects
+ return "{%s}" % (", ".join("%r: %r" % (k, self[k])
+ for k in self))
+ def __reduce__(self):
+ # pickle as dict, the nested struct can be reduced by
itself
+ return (dict, (dict(self),))
+ return Dict
+
+ elif not isinstance(dataType, StructType):
+ raise Exception("unexpected data type: %s" % dataType)
+
+ class Row(tuple):
+ """ Row in SchemaRDD """
+ _fields = tuple(f.name for f in dataType.fields)
+
+ # create property for fast access
+ # use local vars begins with "_"
+ for _i,_f in enumerate(dataType.fields):
+ if _has_struct(_f.dataType):
+ # delay creating object until accessing it
+ _getter = _create_getter(_f.dataType, _i)
+ else:
+ _getter = itemgetter(_i)
+ locals()[_f.name] = property(_getter)
+ del _i, _f, _getter
+
+ def __repr__(self):
+ # call collect __repr__ for nested objects
+ return ("Row(%s)" % ", ".join("%s=%r" % (n, getattr(self, n))
+ for n in self._fields))
+ def __reduce__(self):
+ # pickle as namedtuple
+ return (_restore_object, ("Row", self._fields, tuple(self)))
--- End diff --
I think we should have some limitation about name of fields, they could not
be anything. We already have this limit in SQL parser, the name of field can
not be key words, such `group`.
How to address these, check them with an black list and raise exception?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---