Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150324146
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -454,13 +454,60 @@ def _convert_from_pandas(self, pdf, schema):
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in 
np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in 
np_records], schema
     
             # Convert list of numpy records to python lists
             return [r.tolist() for r in np_records], schema
     
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, 
TimestampType
    +        from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in 
schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with 
Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or 
is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in 
zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at 
least 1 batch after slicing)
    +        if schema is None or isinstance(schema, list):
    +            schema_from_arrow = from_arrow_schema(batches[0].schema)
    +            names = pdf.columns if schema is None else schema
    --- End diff --
    
    hmm, actually that case can be pulled out of here and non-Arrow 
`_create_from_pandas`, which @ueshin brought up in #19646 .  This would 
simplify quite a bit now, so I'll try that out.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to