Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19459#discussion_r145293209 --- Diff: python/pyspark/sql/session.py --- @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema): data = [schema.toInternal(row) for row in data] return self._sc.parallelize(data), schema + def _createFromPandasWithArrow(self, pdf, schema): + """ + Create a DataFrame from a given pandas.DataFrame by slicing it into partitions, converting + to Arrow data, then sending to the JVM to parallelize. If a schema is passed in, the + data types will be used to coerce the data in Pandas to Arrow conversion. + """ + from pyspark.serializers import ArrowSerializer + from pyspark.sql.types import from_arrow_schema, to_arrow_schema + import pyarrow as pa + + # Slice the DataFrame into batches + step = -(-len(pdf) // self.sparkContext.defaultParallelism) # round int up + pdf_slices = (pdf[start:start + step] for start in xrange(0, len(pdf), step)) + arrow_schema = to_arrow_schema(schema) if schema is not None else None + batches = [pa.RecordBatch.from_pandas(pdf_slice, schema=arrow_schema, preserve_index=False) + for pdf_slice in pdf_slices] + + # Verify schema, there will be at least 1 batch from pandas.DataFrame + schema_from_arrow = from_arrow_schema(batches[0].schema) + if schema is not None and schema != schema_from_arrow: + raise ValueError("Supplied schema does not match result from Arrow\nsupplied: " + + "%s\n!=\nfrom Arrow: %s" % (str(schema), str(schema_from_arrow))) --- End diff -- I'd prefer 1. if anything becomes too complicated to match the results for now. I guess 2. could be failed for any unexpected reason comparing `createDataFrame` without Arrow so I thought 1. is required as a guarantee and 2. is good to do. I wonder what @ueshin thinks about this.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org