Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19339#discussion_r141031861
--- Diff: python/pyspark/sql/readwriter.py ---
@@ -420,7 +425,22 @@ def csv(self, path, schema=None, sep=None,
encoding=None, quote=None, escape=Non
columnNameOfCorruptRecord=columnNameOfCorruptRecord,
multiLine=multiLine)
if isinstance(path, basestring):
path = [path]
- return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ if type(path) == list:
+ return
self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
+ elif isinstance(path, RDD):
+ def func(iterator):
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ if isinstance(x, unicode):
+ x = x.encode("utf-8")
+ yield x
+ keyed = path.mapPartitions(func)
+ keyed._bypass_serializer = True
+ jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
--- End diff --
I tried a way within Python and this seems working:
```diff
diff --git a/python/pyspark/sql/readwriter.py
b/python/pyspark/sql/readwriter.py
index 1ed452d895b..0f54065b3ee 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -438,7 +438,10 @@ class DataFrameReader(OptionUtils):
keyed = path.mapPartitions(func)
keyed._bypass_serializer = True
jrdd = keyed._jrdd.map(self._spark._jvm.BytesToString())
- return self._df(self._jreader.csv(jrdd))
+ jdataset = self._spark._jsqlContext.createDataset(
+ jrdd.rdd(),
+ self._spark._sc._jvm.Encoders.STRING())
+ return self._df(self._jreader.csv(jdataset))
else:
raise TypeError("path can be only string, list or RDD")
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]