Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19339#discussion_r141055958
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -420,7 +425,25 @@ def csv(self, path, schema=None, sep=None,
encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord,
multiLine=multiLine)
if isinstance(path, basestring):
path = [path]
- return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ if type(path) == list:
+ return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ elif isinstance(path, RDD):
+ def func(iterator):
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ if isinstance(x, unicode):
+ x = x.encode("utf-8")
+ yield x
+ keyed = path.mapPartitions(func)
+ keyed._bypass_serializer = True
+ jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
+ jdataset = self._spark._ssql_ctx.createDataset(
--- End diff --
Let's add a small comment here to explain why we should create the dataset
(which could look a bit weird in PySpark I believe).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]