Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19459#discussion_r145294433
--- Diff: python/pyspark/sql/session.py ---
@@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema
+ def _createFromPandasWithArrow(self, df, schema):
+ """
+ Create a DataFrame from a given pandas.DataFrame by slicing the
into partitions, converting
+ to Arrow data, then reading into the JVM to parallelsize. If a
schema is passed in, the
+ data types will be used to coerce the data in Pandas to Arrow
conversion.
+ """
+ import os
+ from tempfile import NamedTemporaryFile
+ from pyspark.serializers import ArrowSerializer
+ from pyspark.sql.types import from_arrow_schema, to_arrow_schema
+ import pyarrow as pa
+
+ # Slice the DataFrame into batches
+ step = -(-len(df) // self.sparkContext.defaultParallelism) #
round int up
+ df_slices = (df[start:start + step] for start in xrange(0,
len(df), step))
+ arrow_schema = to_arrow_schema(schema) if schema is not None else
None
+ batches = [pa.RecordBatch.from_pandas(df_slice,
schema=arrow_schema, preserve_index=False)
+ for df_slice in df_slices]
+
+ # write batches to temp file, read by JVM (borrowed from
context.parallelize)
+ tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
+ try:
+ serializer = ArrowSerializer()
+ serializer.dump_stream(batches, tempFile)
+ tempFile.close()
+ readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
+ jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
+ finally:
+ # readRDDFromFile eagerily reads the file so we can delete
right after.
+ os.unlink(tempFile.name)
+
+ # Create the Spark DataFrame, there will be at least 1 batch
+ schema = from_arrow_schema(batches[0].schema)
--- End diff --
Hmmmm.. still get the same exception with pyarrow 0.4.1 ..
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]