Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/19459#discussion_r145032174
--- Diff: python/pyspark/sql/session.py ---
@@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
data = [schema.toInternal(row) for row in data]
return self._sc.parallelize(data), schema
+ def _createFromPandasWithArrow(self, df, schema):
+ """
+ Create a DataFrame from a given pandas.DataFrame by slicing the
into partitions, converting
+ to Arrow data, then reading into the JVM to parallelsize. If a
schema is passed in, the
+ data types will be used to coerce the data in Pandas to Arrow
conversion.
+ """
+ import os
+ from tempfile import NamedTemporaryFile
+ from pyspark.serializers import ArrowSerializer
+ from pyspark.sql.types import from_arrow_schema, to_arrow_schema
+ import pyarrow as pa
+
+ # Slice the DataFrame into batches
+ step = -(-len(df) // self.sparkContext.defaultParallelism) #
round int up
+ df_slices = (df[start:start + step] for start in xrange(0,
len(df), step))
+ arrow_schema = to_arrow_schema(schema) if schema is not None else
None
+ batches = [pa.RecordBatch.from_pandas(df_slice,
schema=arrow_schema, preserve_index=False)
+ for df_slice in df_slices]
+
+ # write batches to temp file, read by JVM (borrowed from
context.parallelize)
+ tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
+ try:
+ serializer = ArrowSerializer()
+ serializer.dump_stream(batches, tempFile)
+ tempFile.close()
+ readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
+ jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
+ finally:
+ # readRDDFromFile eagerily reads the file so we can delete
right after.
+ os.unlink(tempFile.name)
+
+ # Create the Spark DataFrame, there will be at least 1 batch
+ schema = from_arrow_schema(batches[0].schema)
--- End diff --
But I thought we should reduce the diff between `createDataFrame` with
Arrow and `createDataFrame` without Arrow, and match the behaviour first
though. To be clear, my suggestion is:
> 2. We could check the user supplied schema matches the ArrowRecordBatch
schema and fail immediately if not equal. I'm not sure if there might be some
cases where you wouldn't want it to fail - like with different integer types..
but if they are different, fall back to `createDataFrame` without Arrow to
reduce the differences between them. I carefully guess `createDataFrame` is
stricter?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]