Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145293209
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else 
None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, 
schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from 
pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from 
Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), 
str(schema_from_arrow)))
    --- End diff --
    
    I'd prefer 1. if anything becomes too complicated to match the results for 
now. I guess 2. could be failed for any unexpected reason comparing 
`createDataFrame` without Arrow so I thought 1. is required as a guarantee and 
2. is good to do. I wonder what @ueshin thinks about this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to