linar-jether commented on PR #29719:
URL: https://github.com/apache/spark/pull/29719#issuecomment-1820601646
@samkumar @JacekPliszka
Using spark 3.5, here's a sample for converting an RDD of arrow record
batches to a spark DataFrame, this still uses internal spark methods, so no
guarantees of future compatibility.
Hopefully this can be incorporated into a proper api...
```python
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.pandas.types import from_arrow_schema
import pyarrow as pa
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
def map_to_record_batch(i):
df = pd.DataFrame(np.random.randn(10, 4) * i, columns=list('ABCD'))
return pa.RecordBatch.from_pandas(df)
if __name__ == '__main__':
spark = SparkSession.builder.appName("Python Arrow-in-Spark
example").getOrCreate()
# Create an RDD of Arrow RecordBatch objects
ardd = spark.sparkContext.parallelize([1, 2, 3, 4], 4)
ardd = ardd.map(map_to_record_batch).cache() # cache to avoid
recomputing after inferring schema
# Peek at the first record batch to infer schema
arrow_schema = ardd.first().schema
spark_schema = from_arrow_schema(arrow_schema)
# Convert RDD[RecordBatch] to RDD[bytearray] for serialization
ardd = ardd.map(lambda x: bytearray(x.serialize()))
# Create a spark DataFrame from RDD[bytearray] and schema
jrdd = ardd._to_java_object_rdd()
jdf = spark._jvm.PythonSQLUtils.toDataFrame(jrdd, spark_schema.json(),
spark._jsparkSession)
df = DataFrame(jdf, spark)
df._schema = spark_schema
df.show()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]