Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145334576
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,39 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else 
None
    +        batches = [pa.RecordBatch.from_pandas(pdf_slice, 
schema=arrow_schema, preserve_index=False)
    +                   for pdf_slice in pdf_slices]
    +
    +        # Verify schema, there will be at least 1 batch from 
pandas.DataFrame
    +        schema_from_arrow = from_arrow_schema(batches[0].schema)
    +        if schema is not None and schema != schema_from_arrow:
    +            raise ValueError("Supplied schema does not match result from 
Arrow\nsupplied: " +
    +                             "%s\n!=\nfrom Arrow: %s" % (str(schema), 
str(schema_from_arrow)))
    --- End diff --
    
    It's okay to fallback with warnings, but I think we should try to adjust 
types specified by users before that. Otherwise, users can never get the 
benefit from Arrow when users don't know how to adjust types especially 
integral types including NaN values.
    We can split pandas DataFrame into Series once and adjust types during 
building RecordBatches. I guess we should modify the timestamp values to have 
timezone for each Series when we support timestamp type anyway.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to