Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r144945183
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the 
into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a 
schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # 
round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, 
len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else 
None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, 
schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from 
context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete 
right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    This brings up a good point, the schema should usually be the same, but 
what should the behavior be if the user specifies the wrong schema?
    
    1. If use the passed in schema and it is wrong (the line in the comment 
above) then there might be an error later when an operation is performed.  For 
example, if a string column was specified as DoubleType that would produce:  
    "Caused by: java.lang.UnsupportedOperationException at 
org.apache.spark.sql.execution.vectorized.ArrowColumnVector$ArrowVectorAccessor.getDouble(ArrowColumnVector.java:395)"
    
    2.  We could check the user supplied schema matches the ArrowRecordBatch 
schema and fail immediately if not equal.  I'm not sure if there might be some 
cases where you wouldn't want it to fail - like with different integer types..
    
    3.  Always use the schema from the ArrowRecordBatch (how it is currently in 
this PR)
    
    I'm thinking that (2) is the best since it's the safest and would produce a 
clear error message.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to