Hello devs,

I know a lot of great work has been done recently with pandas to spark
dataframes and vice versa using Apache Arrow, but I faced a specific pain
point on a low memory setup without Arrow.

Specifically I was finding a driver OOM running a toPandas on a small
dataset (<100 MB compressed).  There was discussion about toPandas being
slow
<http://apache-spark-developers-list.1001551.n3.nabble.com/toPandas-very-slow-td16794.html>
in March 2016 due to a self.collect().  A solution was found to create Pandas
DataFrames or Numpy Arrays using MapPartitions for each partition
<https://gist.github.com/joshlk/871d58e01417478176e7>, but it was never
implemented back into dataframe.py

I understand that using Apache arrow will solve this, but in a setup
without Arrow (like the one where I faced the painpoint), I investigated
memory usage of toPanda and to_pandas (dataframe per partition) and played
with the number of partitions.  The findings are here
<https://gist.github.com/mrandrewandrade/7f5ff26c5275376d3cd5e427ca74d50f>.

The summary of the findings are that on a 147MB dataset, toPandas memory
usage was about 784MB while while doing it partition by partition (with 100
partitions) had an overhead of 76.30 MM and took almost half of the time to
run.  I realize that Arrow solves this but the modification is quite small
and would greatly assist anyone who isn't able to use Arrow.

Would a PR [1] from me to address this issue be welcome?

Thanks,

Andrew

[1] From Josh's Gist

def _map_to_pandas(rdds):
    """ Needs to be here due to pickling issues """
    return [pd.DataFrame(list(rdds))]

def toPandas(df, n_partitions=None):
    """
    Returns the contents of `df` as a local `pandas.DataFrame` in a speedy
fashion. The DataFrame is
    repartitioned if `n_partitions` is passed.
    :param df:              pyspark.sql.DataFrame
    :param n_partitions:    int or None
    :return:                pandas.DataFrame
    """
    if n_partitions is not None: df = df.repartition(n_partitions)
    df_pand = df.rdd.mapPartitions(_map_to_pandas).collect()
    df_pand = pd.concat(df_pand)
    df_pand.columns = df.columns
    return df_pand

Reply via email to