Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19459#discussion_r145028238
  
    --- Diff: python/pyspark/sql/session.py ---
    @@ -414,6 +415,43 @@ def _createFromLocal(self, data, schema):
             data = [schema.toInternal(row) for row in data]
             return self._sc.parallelize(data), schema
     
    +    def _createFromPandasWithArrow(self, df, schema):
    +        """
    +        Create a DataFrame from a given pandas.DataFrame by slicing the 
into partitions, converting
    +        to Arrow data, then reading into the JVM to parallelsize. If a 
schema is passed in, the
    +        data types will be used to coerce the data in Pandas to Arrow 
conversion.
    +        """
    +        import os
    +        from tempfile import NamedTemporaryFile
    +        from pyspark.serializers import ArrowSerializer
    +        from pyspark.sql.types import from_arrow_schema, to_arrow_schema
    +        import pyarrow as pa
    +
    +        # Slice the DataFrame into batches
    +        step = -(-len(df) // self.sparkContext.defaultParallelism)  # 
round int up
    +        df_slices = (df[start:start + step] for start in xrange(0, 
len(df), step))
    +        arrow_schema = to_arrow_schema(schema) if schema is not None else 
None
    +        batches = [pa.RecordBatch.from_pandas(df_slice, 
schema=arrow_schema, preserve_index=False)
    +                   for df_slice in df_slices]
    +
    +        # write batches to temp file, read by JVM (borrowed from 
context.parallelize)
    +        tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)
    +        try:
    +            serializer = ArrowSerializer()
    +            serializer.dump_stream(batches, tempFile)
    +            tempFile.close()
    +            readRDDFromFile = self._jvm.PythonRDD.readRDDFromFile
    +            jrdd = readRDDFromFile(self._jsc, tempFile.name, len(batches))
    +        finally:
    +            # readRDDFromFile eagerily reads the file so we can delete 
right after.
    +            os.unlink(tempFile.name)
    +
    +        # Create the Spark DataFrame, there will be at least 1 batch
    +        schema = from_arrow_schema(batches[0].schema)
    --- End diff --
    
    Could we check ahead and fall back to `toPandas` without Arrow? I think I 
am seeing some differences in some cases, for example:
    
    ```python
    import pandas as pd
    import numpy as np
    
    spark.conf.set("spark.sql.execution.arrow.enabled", "false")
    spark.createDataFrame(pd.DataFrame(data={"intcol": [0.8]}), schema="intcol 
STRING").show()
    spark.conf.set("spark.sql.execution.arrow.enabled", "true")
    spark.createDataFrame(pd.DataFrame(data={"intcol": [0.8]}), schema="intcol 
STRING").show()
    ```
    
    WDYT @BryanCutler and @ueshin ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to