Github user BryanCutler commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r144601470 --- Diff: python/pyspark/sql/session.py --- @@ -510,9 +511,43 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr except Exception: has_pandas = False if has_pandas and isinstance(data, pandas.DataFrame): - if schema is None: - schema = [str(x) for x in data.columns] - data = [r.tolist() for r in data.to_records(index=False)] + if self.conf.get("spark.sql.execution.arrow.enabled", "false").lower() == "true" \ + and len(data) > 0: + from pyspark.serializers import ArrowSerializer + from pyspark.sql.types import from_arrow_schema + import pyarrow as pa + + # Slice the DataFrame into batches + split = -(-len(data) // self.sparkContext.defaultParallelism) # round int up + slices = (data[i:i + split] for i in xrange(0, len(data), split)) + batches = [pa.RecordBatch.from_pandas(sliced_df, preserve_index=False) + for sliced_df in slices] + + # write batches to temp file, read by JVM (borrowed from context.parallelize) + import os + from tempfile import NamedTemporaryFile + tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir) + try: + serializer = ArrowSerializer() + serializer.dump_stream(batches, tempFile) + tempFile.close() + readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile + jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches)) + finally: + # readRDDFromFile eagerily reads the file so we can delete right after. + os.unlink(tempFile.name) + + # Create the Spark DataFrame, there will be at least 1 batch + schema = from_arrow_schema(batches[0].schema) + jdf = self._jvm.PythonSQLUtils.arrowPayloadToDataFrame( + jrdd, schema.json(), self._wrapped._jsqlContext) + df = DataFrame(jdf, self._wrapped) + df._schema = schema --- End diff -- If the schema is not set here, then it will lazily create it through a py4j exchange with the java DataFrame. Since we already have it here, we can just set it and save some time. I don't like manually setting it like this though, it should be an optional arg in the DataFrame constructor. I'll make that change, but if you prefer not to do that I can revert.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org