Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r150445421
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -438,28 +438,70 @@ def _get_numpy_record_dtypes(self, rec):
                     curr_type = 'datetime64[us]'
                     has_rec_fix = True
                 record_type_list.append((str(col_names[i]), curr_type))
    -        return record_type_list if has_rec_fix else None
    +        return np.dtype(record_type_list) if has_rec_fix else None
     
    -    def _convert_from_pandas(self, pdf, schema):
    +    def _convert_from_pandas(self, pdf):
             """
              Convert a pandas.DataFrame to list of records that can be used to 
make a DataFrame
    -         :return tuple of list of records and schema
    +         :return list of records
             """
    -        # If no schema supplied by user then get the names of columns only
    -        if schema is None:
    -            schema = [str(x) for x in pdf.columns]
     
             # Convert pandas.DataFrame to list of numpy records
             np_records = pdf.to_records(index=False)
     
             # Check if any columns need to be fixed for Spark to infer properly
             if len(np_records) > 0:
    -            record_type_list = self._get_numpy_record_dtypes(np_records[0])
    -            if record_type_list is not None:
    -                return [r.astype(record_type_list).tolist() for r in 
np_records], schema
    +            record_dtype = self._get_numpy_record_dtype(np_records[0])
    +            if record_dtype is not None:
    +                return [r.astype(record_dtype).tolist() for r in 
np_records]
     
             # Convert list of numpy records to python lists
    -        return [r.tolist() for r in np_records], schema
    +        return [r.tolist() for r in np_records]
    +
    +    def _create_from_pandas_with_arrow(self, pdf, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing it 
into partitions, converting
    +        to Arrow data, then sending to the JVM to parallelize. If a schema 
is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        from pyspark.serializers import ArrowSerializer, _create_batch
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_type, 
TimestampType
    +        from pandas.api.types import is_datetime64_dtype, 
is_datetime64tz_dtype
    +
    +        # Determine arrow types to coerce data when creating batches
    +        if isinstance(schema, StructType):
    +            arrow_types = [to_arrow_type(f.dataType) for f in 
schema.fields]
    +        elif isinstance(schema, DataType):
    +            raise ValueError("Single data type %s is not supported with 
Arrow" % str(schema))
    +        else:
    +            # Any timestamps must be coerced to be compatible with Spark
    +            arrow_types = [to_arrow_type(TimestampType())
    +                           if is_datetime64_dtype(t) or 
is_datetime64tz_dtype(t) else None
    +                           for t in pdf.dtypes]
    +
    +        # Slice the DataFrame to be batched
    +        step = -(-len(pdf) // self.sparkContext.defaultParallelism)  # 
round int up
    +        pdf_slices = (pdf[start:start + step] for start in xrange(0, 
len(pdf), step))
    +
    +        # Create Arrow record batches
    +        batches = [_create_batch([(c, t) for (_, c), t in 
zip(pdf_slice.iteritems(), arrow_types)])
    +                   for pdf_slice in pdf_slices]
    +
    +        # Create the Spark schema from the first Arrow batch (always at 
least 1 batch after slicing)
    +        if isinstance(schema, (list, tuple)):
    +            struct = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    @BryanCutler, I think here we'd meet the same issue, SPARK-15244 in this 
code path. Mind opening a followup with a simple test if it is true?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to